aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java77
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java111
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java251
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java115
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java170
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java65
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java433
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java371
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java570
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java102
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java122
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java151
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java69
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java32
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java106
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java7
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java66
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java114
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java325
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java478
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java234
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java588
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java691
33 files changed, 5741 insertions, 12 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java
new file mode 100644
index 000000000..0540d0be1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+/**
+ * Core interface for a projected column. Models a column throughout the
+ * projection lifecycle. Columns evolve from unresolved to resolved at
+ * different times. Each class that derives from this interface can act
+ * as a column "node", while declaring its type so it may be processed
+ * easily at the proper type.
+ * <p>
+ * For example, an implicit column is processed at the file schema
+ * resolution phase, converting from unresolved to resolved. At the same
+ * time, table columns remain unresolved, waiting for the table schema
+ * to appear.
+ * <p>
+ * In an advanced, experimental feature, schema persistence sees some
+ * columns transition from resolved to unresolved and back again.
+ * <p>
+ * Having all column nodes derive from this same interface keeps things
+ * tidy.
+ */
+
+public interface ColumnProjection {
+
+ public static final int FRAMEWORK_BASE_ID = 100;
+ public static final int READER_BASE_ID = 200;
+
+ /**
+ * The name of the column as it appears in the output
+ * row (record batch.)
+ *
+ * @return the output column name
+ */
+ String name();
+
+ /**
+ * A node type unique to each node. Nodes defined in this package
+ * use IDs less than {@link #FRAMEWORK_BASE_ID}. Nodes defined by
+ * frameworks (for file metadata columns or for other special
+ * columns) start numbering with {@link #FRAMEWORK_BASE_ID}. Readers
+ * may need their own specialized nodes, which must use IDs starting
+ * with {@link #READER_BASE_ID}.
+ * <p>
+ * This system solves two problems:
+ * <ol>
+ * <li>Provides an efficient way for each mechanism to recognize its
+ * own nodes without using <code>instance of</code>.</li>
+ * <li>Allows for frameworks and readers to be added without changing
+ * any base enum. This works because every instance of this mechanism
+ * sees only the base nodes, those from a single framework and those
+ * from a single reader; there is no need for a universal ID registry.
+ * Two frameworks can use identical IDs because they never mix.
+ * Similarly, two readers can use the same IDs because Drill does not
+ * allow a single scan operator to use two different kinds of readers.
+ * </li>
+ * </ol>
+ * @return the numeric ID for this node, used for each layer to
+ * recognize its own nodes
+ */
+ int nodeType();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java
new file mode 100644
index 000000000..6d2e5d121
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+
+/**
+ * Populate metadata columns either file metadata (AKA "implicit
+ * columns") or directory metadata (AKA "partition columns.") In both
+ * cases the column type is nullable Varchar and the column value
+ * is predefined by the projection planner; this class just copies
+ * that value into each row.
+ * <p>
+ * The values for the columns appear in the column definitions.
+ * This works because the metadata columns are re-resolved for
+ * for each file, picking up file-specific values. In some cases,
+ * a column might not even have a value (such as a reference to
+ * a dirN level that isn't defined for a particular file.
+ * <p>
+ * That said, this class is agnostic about the source and meaning
+ * of the columns: it only cares that the columns are of type
+ * nullable Varchar and that the values are in the column nodes.
+ */
+
+public class ConstantColumnLoader extends StaticColumnLoader {
+
+ public interface ConstantColumnSpec {
+ String name();
+ MaterializedField schema();
+ String value();
+ }
+
+ private final String values[];
+ private final List<? extends ConstantColumnSpec> constantCols;
+
+ public ConstantColumnLoader(ResultVectorCache vectorCache,
+ List<? extends ConstantColumnSpec> defns) {
+ super(vectorCache);
+
+ // Populate the loader schema from that provided.
+ // Cache values for faster access.
+ // TODO: Rewrite to specify schema up front
+
+ constantCols = defns;
+ RowSetLoader schema = loader.writer();
+ values = new String[defns.size()];
+ for (int i = 0; i < defns.size(); i++) {
+ ConstantColumnSpec defn = defns.get(i);
+ values[i] = defn.value();
+ schema.addColumn(defn.schema());
+ }
+ }
+
+ @Override
+ public VectorContainer load(int rowCount) {
+ loader.startBatch();
+ RowSetLoader writer = loader.writer();
+ for (int i = 0; i < rowCount; i++) {
+ writer.start();
+ loadRow(writer);
+ writer.save();
+ }
+ return loader.harvest();
+ }
+
+ /**
+ * Populate static vectors with the defined static values.
+ *
+ * @param rowCount number of rows to generate. Must match the
+ * row count in the batch returned by the reader
+ */
+
+ private void loadRow(TupleWriter writer) {
+ for (int i = 0; i < values.length; i++) {
+
+ // Set the column (of any type) to null if the string value
+ // is null.
+
+ if (values[i] == null) {
+ writer.scalar(i).setNull();
+ } else {
+ // Else, set the static (string) value.
+
+ writer.scalar(i).setString(values[i]);
+ }
+ }
+ }
+
+ public List<? extends ConstantColumnSpec> columns() { return constantCols; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
new file mode 100644
index 000000000..41cc59582
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Perform a schema projection for the case of an explicit list of
+ * projected columns. Example: SELECT a, b, c.
+ * <p>
+ * An explicit projection starts with the requested set of columns,
+ * then looks in the table schema to find matches. That is, it is
+ * driven by the query itself.
+ * <p>
+ * An explicit projection may include columns that do not exist in
+ * the source schema. In this case, we fill in null columns for
+ * unmatched projections.
+ */
+
+public class ExplicitSchemaProjection extends SchemaLevelProjection {
+
+ public ExplicitSchemaProjection(ScanLevelProjection scanProj,
+ TupleMetadata tableSchema,
+ ResolvedTuple rootTuple,
+ List<SchemaProjectionResolver> resolvers) {
+ super(resolvers);
+ resolveRootTuple(scanProj, rootTuple, tableSchema);
+ }
+
+ private void resolveRootTuple(ScanLevelProjection scanProj,
+ ResolvedTuple rootTuple,
+ TupleMetadata tableSchema) {
+ for (ColumnProjection col : scanProj.columns()) {
+ if (col.nodeType() == UnresolvedColumn.UNRESOLVED) {
+ resolveColumn(rootTuple, ((UnresolvedColumn) col).element(), tableSchema);
+ } else {
+ resolveSpecial(rootTuple, col, tableSchema);
+ }
+ }
+ }
+
+ private void resolveColumn(ResolvedTuple outputTuple,
+ RequestedColumn inputCol, TupleMetadata tableSchema) {
+ int tableColIndex = tableSchema.index(inputCol.name());
+ if (tableColIndex == -1) {
+ resolveNullColumn(outputTuple, inputCol);
+ } else {
+ resolveTableColumn(outputTuple, inputCol,
+ tableSchema.metadata(tableColIndex),
+ tableColIndex);
+ }
+ }
+
+ private void resolveTableColumn(ResolvedTuple outputTuple,
+ RequestedColumn requestedCol,
+ ColumnMetadata column, int sourceIndex) {
+
+ // Is the requested column implied to be a map?
+ // A requested column is a map if the user requests x.y and we
+ // are resolving column x. The presence of y as a member implies
+ // that x is a map.
+
+ if (requestedCol.isTuple()) {
+ resolveMap(outputTuple, requestedCol, column, sourceIndex);
+ }
+
+ // Is the requested column implied to be an array?
+ // This occurs when the projection list contains at least one
+ // array index reference such as x[10].
+
+ else if (requestedCol.isArray()) {
+ resolveArray(outputTuple, requestedCol, column, sourceIndex);
+ }
+
+ // A plain old column. Might be an array or a map, but if
+ // so, the request list just mentions it by name without implying
+ // the column type. That is, the project list just contains x
+ // by itself.
+
+ else {
+ projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+ }
+ }
+
+ private void resolveMap(ResolvedTuple outputTuple,
+ RequestedColumn requestedCol, ColumnMetadata column,
+ int sourceIndex) {
+
+ // If the actual column isn't a map, then the request is invalid.
+
+ if (! column.isMap()) {
+ throw UserException
+ .validationError()
+ .message("Project list implies a map column, but actual column is not a map")
+ .addContext("Projected column", requestedCol.fullName())
+ .addContext("Actual type", column.type().name())
+ .build(logger);
+ }
+
+ // The requested column is implied to be a map because it lists
+ // members to project. Project these.
+
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple,
+ column.schema(), sourceIndex);
+ resolveTuple(mapCol.members(), requestedCol.mapProjection(),
+ column.mapSchema());
+
+ // If the projection is simple, then just project the map column
+ // as is. A projection is simple if all map columns from the table
+ // are projected, and no null columns are needed. The simple case
+ // occurs more often than one might expect because the result set
+ // loader only projected those columns that were needed, so the only
+ // issue we have to handle is null columns.
+ //
+ // In the simple case, we discard the map tuple just created
+ // since we ended up not needing it.
+
+ if (mapCol.members().isSimpleProjection()) {
+ outputTuple.removeChild(mapCol.members());
+ projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+ }
+
+ // The resolved tuple may have a subset of table columns
+ // and/or null columns. Project a new map that will be created
+ // to hold the projected map elements.
+
+ else {
+ outputTuple.add(mapCol);
+ }
+ }
+
+ private void resolveTuple(ResolvedTuple mapTuple,
+ RequestedTuple requestedTuple, TupleMetadata mapSchema) {
+ for (RequestedColumn col : requestedTuple.projections()) {
+ resolveColumn(mapTuple, col, mapSchema);
+ }
+ }
+
+ private void resolveArray(ResolvedTuple outputTuple,
+ RequestedColumn requestedCol, ColumnMetadata column,
+ int sourceIndex) {
+
+ // If the actual column isn't a array or list,
+ // then the request is invalid.
+
+ if (column.type() != MinorType.LIST && ! column.isArray()) {
+ throw UserException
+ .validationError()
+ .message("Project list implies an array, but actual column is not an array")
+ .addContext("Projected column", requestedCol.fullName())
+ .addContext("Actual cardinality", column.mode().name())
+ .build(logger);
+ }
+
+ // The project operator will do the actual array element
+ // projection.
+
+ projectTableColumn(outputTuple, requestedCol, column, sourceIndex);
+ }
+
+ /**
+ * Project a column to the specified output tuple. The name comes from the
+ * project list. (If the actual column name is `X` (upper case), but the
+ * project list requests `x` (lower case), project the column using the
+ * lower-case name. The column type comes from the table column. The source
+ * index is the location in the table map or row.
+ *
+ * @param outputTuple
+ * projected tuple being built
+ * @param requestedCol
+ * column as requested in the project list
+ * @param column
+ * metadata for the actual table column
+ * @param sourceIndex
+ * index of the column within the table tuple (implies the location
+ * of the table vector to be projected)
+ */
+
+ private void projectTableColumn(ResolvedTuple outputTuple,
+ RequestedColumn requestedCol,
+ ColumnMetadata column, int sourceIndex) {
+ outputTuple.add(
+ new ResolvedTableColumn(requestedCol.name(),
+ MaterializedField.create(requestedCol.name(),
+ column.majorType()),
+ outputTuple, sourceIndex));
+ }
+
+ /**
+ * Resolve a null column. This is a projected column which does not match
+ * an implicit or table column. We consider two cases: a simple top-level
+ * column reference ("a", say) and an implied map reference ("a.b", say.)
+ * If the column appears to be a map, determine the set of children, which
+ * map appear to any depth, that were requested.
+ */
+
+ private void resolveNullColumn(ResolvedTuple outputTuple,
+ RequestedColumn requestedCol) {
+ ResolvedColumn nullCol;
+ if (requestedCol.isTuple()) {
+ nullCol = resolveMapMembers(outputTuple, requestedCol);
+ } else {
+ nullCol = outputTuple.nullBuilder.add(requestedCol.name());
+ }
+ outputTuple.add(nullCol);
+ }
+
+ /**
+ * A child column of a map is not projected. Recurse to determine the full
+ * set of nullable child columns.
+ *
+ * @param projectedColumn the map column which was projected
+ * @return a list of null markers for the requested children
+ */
+
+ private ResolvedColumn resolveMapMembers(ResolvedTuple outputTuple, RequestedColumn col) {
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple, col.name());
+ ResolvedTuple members = mapCol.members();
+ for (RequestedColumn child : col.mapProjection().projections()) {
+ if (child.isTuple()) {
+ members.add(resolveMapMembers(members, child));
+ } else {
+ members.add(outputTuple.nullBuilder.add(child.name()));
+ }
+ }
+ return mapCol;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java
new file mode 100644
index 000000000..fe1e0240a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+
+/**
+ * Queries can contain a wildcard (*), table columns, or special
+ * system-defined columns (the file metadata columns AKA implicit
+ * columns, the `columns` column of CSV, etc.).
+ * <p>
+ * This class provides a generalized way of handling such extended
+ * columns. That is, this handles metadata for columns defined by
+ * the scan or file; columns defined by the table (the actual
+ * data metadata) is handled elsewhere.
+ * <p>
+ * Objects of this interface are driven by the projection processing
+ * framework which provides a vector cache from which to obtain
+ * materialized columns. The implementation must provide a projection
+ * parser to pick out the columns which this object handles.
+ * <p>
+ * A better name might be ImplicitMetadataManager to signify that
+ * this is about metadata other than table columns.
+ */
+public interface MetadataManager {
+
+ void bind(ResultVectorCache vectorCache);
+
+ ScanProjectionParser projectionParser();
+
+ SchemaProjectionResolver resolver();
+
+ /**
+ * Define (materialize) the columns which this manager
+ * represents.
+ */
+ void define();
+
+ /**
+ * Load data into the custom columns, if needed (such as for
+ * null or implicit columns.)
+ *
+ * @param rowCount number of rows read into a batch.
+ */
+ void load(int rowCount);
+
+ /**
+ * Event indicating the end of a file (or other data source.)
+ */
+ void endFile();
+
+ /**
+ * Event indicating the end of a scan.
+ */
+ void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java
new file mode 100644
index 000000000..ec95de8a7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+
+/**
+ * Do-nothing implementation of the metadata manager. Allows the
+ * metadata manager to be optional without needing an if-statement
+ * on every access.
+ */
+
+public class NoOpMetadataManager implements MetadataManager {
+
+ @Override
+ public void bind(ResultVectorCache vectorCache) { }
+
+ @Override
+ public ScanProjectionParser projectionParser() { return null; }
+
+ @Override
+ public SchemaProjectionResolver resolver() {
+ // The resolver is requested only for user-defined metadata
+ // managers, not for this default, no-op version. If this
+ // method is called, something is amiss with the default
+ // setup.
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public void define() { }
+
+ @Override
+ public void load(int rowCount) { }
+
+ @Override
+ public void endFile() { }
+
+ @Override
+ public void close() { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
new file mode 100644
index 000000000..7e9d2fdd4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader.NullColumnSpec;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Manages null columns by creating a null column loader for each
+ * set of non-empty null columns. This class acts as a scan-wide
+ * facade around the per-schema null column loader.
+ */
+
+public class NullColumnBuilder implements VectorSource {
+
+ /**
+ * Creates null columns if needed.
+ */
+
+ protected final List<NullColumnSpec> nullCols = new ArrayList<>();
+ private NullColumnLoader nullColumnLoader;
+ private VectorContainer outputContainer;
+
+ /**
+ * The reader-specified null type if other than the default.
+ */
+
+ private final MajorType nullType;
+ private final boolean allowRequiredNullColumns;
+
+ public NullColumnBuilder(
+ MajorType nullType, boolean allowRequiredNullColumns) {
+ this.nullType = nullType;
+ this.allowRequiredNullColumns = allowRequiredNullColumns;
+ }
+
+ public NullColumnBuilder newChild() {
+ return new NullColumnBuilder(nullType, allowRequiredNullColumns);
+ }
+
+ public ResolvedNullColumn add(String name) {
+ return add(name, null);
+ }
+
+ public ResolvedNullColumn add(String name, MajorType type) {
+ final ResolvedNullColumn col = new ResolvedNullColumn(name, type, this, nullCols.size());
+ nullCols.add(col);
+ return col;
+ }
+
+ public void build(ResultVectorCache vectorCache) {
+ close();
+
+ // If no null columns for this schema, no need to create
+ // the loader.
+
+ if (hasColumns()) {
+ nullColumnLoader = new NullColumnLoader(vectorCache, nullCols, nullType, allowRequiredNullColumns);
+ outputContainer = nullColumnLoader.output();
+ }
+ }
+
+ public boolean hasColumns() {
+ return nullCols != null && ! nullCols.isEmpty();
+ }
+
+ public void load(int rowCount) {
+ if (nullColumnLoader != null) {
+ final VectorContainer output = nullColumnLoader.load(rowCount);
+ assert output == outputContainer;
+ }
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return outputContainer.getValueVector(index).getValueVector();
+ }
+
+ @VisibleForTesting
+ public VectorContainer output() { return outputContainer; }
+
+ public void close() {
+ if (nullColumnLoader != null) {
+ nullColumnLoader.close();
+ nullColumnLoader = null;
+ }
+ if (outputContainer != null) {
+ outputContainer.clear();
+ outputContainer = null;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java
new file mode 100644
index 000000000..0c36cb99d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Create and populate null columns for the case in which a SELECT statement
+ * refers to columns that do not exist in the actual table. Nullable and array
+ * types are suitable for null columns. (Drill defines an empty array as the
+ * same as a null array: not true, but the best we have at present.) Required
+ * types cannot be used as we don't know what value to set into the column
+ * values.
+ * <p>
+ * Seeks to preserve "vector continuity" by reusing vectors when possible.
+ * Cases:
+ * <ul>
+ * <li>A column a was available in a prior reader (or batch), but is no longer
+ * available, and is thus null. Reuses the type and vector of the prior reader
+ * (or batch) to prevent trivial schema changes.</li>
+ * <li>A column has an implied type (specified in the metadata about the
+ * column provided by the reader.) That type information is used instead of
+ * the defined null column type.</li>
+ * <li>A column has no type information. The type becomes the null column type
+ * defined by the reader (or nullable int by default.</li>
+ * <li>Required columns are not suitable. If any of the above found a required
+ * type, convert the type to nullable.</li>
+ * <li>The resulting column and type, whatever it turned out to be, is placed
+ * into the vector cache so that it can be reused by the next reader or batch,
+ * to again preserve vector continuity.</li>
+ * </ul>
+ * The above rules eliminate "trivia" schema changes, but can still result in
+ * "hard" schema changes if a required type is replaced by a nullable type.
+ */
+
+public class NullColumnLoader extends StaticColumnLoader {
+
+ public interface NullColumnSpec {
+ String name();
+ MajorType type();
+ void setType(MajorType type);
+ }
+
+ public static final MajorType DEFAULT_NULL_TYPE = MajorType.newBuilder()
+ .setMinorType(MinorType.INT)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+
+ private final MajorType nullType;
+ private final boolean allowRequired;
+ private final boolean isArray[];
+
+ public NullColumnLoader(ResultVectorCache vectorCache, List<? extends NullColumnSpec> defns,
+ MajorType nullType, boolean allowRequired) {
+ super(vectorCache);
+
+ // Normally, null columns must be optional or arrays. However
+ // we allow required columns either if the client requests it,
+ // or if the client's requested null type is itself required.
+ // (The generated "null column" vectors will go into the vector
+ // cache and be pulled back out; we must preserve the required
+ // mode in this case.
+
+ this.allowRequired = allowRequired ||
+ (nullType != null && nullType.getMode() == DataMode.REQUIRED);
+
+ // Use the provided null type, else the standard nullable int.
+
+ if (nullType == null) {
+ this.nullType = DEFAULT_NULL_TYPE;
+ } else {
+ this.nullType = nullType;
+ }
+
+ // Populate the loader schema from that provided
+
+ RowSetLoader schema = loader.writer();
+ isArray = new boolean[defns.size()];
+ for (int i = 0; i < defns.size(); i++) {
+ NullColumnSpec defn = defns.get(i);
+ MaterializedField colSchema = selectType(defn);
+ isArray[i] = colSchema.getDataMode() == DataMode.REPEATED;
+ schema.addColumn(colSchema);
+ }
+ }
+
+ /**
+ * Implements the type mapping algorithm; preferring the best fit
+ * to preserve the schema, else resorting to changes when needed.
+ * @param defn output column definition
+ * @return type of the empty column that implements the definition
+ */
+
+ private MaterializedField selectType(NullColumnSpec defn) {
+
+ // Prefer the type of any previous occurrence of
+ // this column.
+
+ MajorType type = vectorCache.getType(defn.name());
+
+ // Else, use the type defined in the projection, if any.
+
+ if (type == null) {
+ type = defn.type();
+ }
+ if (type != null && ! allowRequired && type.getMode() == DataMode.REQUIRED) {
+
+ // Type was found in the vector cache and the type is required.
+ // The client determines whether to map required types to optional.
+ // The text readers use required Varchar columns for missing columns,
+ // and so no required-to-optional mapping is desired. Other readers
+ // want to use nulls for missing columns, and so need the
+ // required-to-nullable mapping.
+
+ type = MajorType.newBuilder()
+ .setMinorType(type.getMinorType())
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ }
+
+ // Else, use the specified null type.
+
+ if (type == null) {
+ type = nullType;
+ }
+
+ // If the schema had the special NULL type, replace it with the
+ // null column type.
+
+ if (type.getMinorType() == MinorType.NULL) {
+ type = nullType;
+ }
+ defn.setType(type);
+ return MaterializedField.create(defn.name(), type);
+ }
+
+ public VectorContainer output() {
+ return loader.outputContainer();
+ }
+
+ @Override
+ public VectorContainer load(int rowCount) {
+ loader.startBatch();
+ loader.skipRows(rowCount);
+ return loader.harvest();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java
new file mode 100644
index 000000000..a658629f2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * A resolved column has a name, and a specification for how to project
+ * data from a source vector to a vector in the final output container.
+ * Describes the projection of a single column from
+ * an input to an output batch.
+ * <p>
+ * Although the table schema mechanism uses the newer "metadata"
+ * mechanism, resolved columns revert back to the original
+ * {@link MajorType} and {@link MaterializedField} mechanism used
+ * by the rest of Drill. Doing so loses a bit of additional
+ * information, but at present there is no way to export that information
+ * along with a serialized record batch; each operator must rediscover
+ * it after deserialization.
+ */
+
+public abstract class ResolvedColumn implements ColumnProjection {
+
+ public final VectorSource source;
+ public final int sourceIndex;
+
+ public ResolvedColumn(VectorSource source, int sourceIndex) {
+ this.source = source;
+ this.sourceIndex = sourceIndex;
+ }
+
+ public VectorSource source() { return source; }
+
+ public int sourceIndex() { return sourceIndex; }
+
+ /**
+ * Return the type of this column. Used primarily by the schema smoothing
+ * mechanism.
+ *
+ * @return the MaterializedField representation of this column
+ */
+
+ public abstract MaterializedField schema();
+
+ public void project(ResolvedTuple dest) {
+ dest.addVector(source.vector(sourceIndex));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java
new file mode 100644
index 000000000..4a158f7b0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedMap;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedMapArray;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedSingleMap;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Represents a column which is implicitly a map (because it has children
+ * in the project list), but which does not match any column in the table.
+ * This kind of column gives rise to a map of null columns in the output.
+ */
+
+public class ResolvedMapColumn extends ResolvedColumn {
+
+ public static final int ID = 17;
+
+ private final MaterializedField schema;
+ private final ResolvedTuple parent;
+ private final ResolvedMap members;
+
+ public ResolvedMapColumn(ResolvedTuple parent, String name) {
+ super(parent, -1);
+ schema = MaterializedField.create(name,
+ Types.required(MinorType.MAP));
+ this.parent = parent;
+ members = new ResolvedSingleMap(this);
+ parent.addChild(members);
+ }
+
+ public ResolvedMapColumn(ResolvedTuple parent,
+ MaterializedField schema, int sourceIndex) {
+ super(parent, sourceIndex);
+ this.schema = schema;
+ this.parent = parent;
+
+ // This column corresponds to an input map.
+ // We may have to create a matching new output
+ // map. Determine whether it is a single map or
+ // an array.
+
+ assert schema.getType().getMinorType() == MinorType.MAP;
+ if (schema.getType().getMode() == DataMode.REPEATED) {
+ members = new ResolvedMapArray(this);
+ } else {
+ members = new ResolvedSingleMap(this);
+ }
+ parent.addChild(members);
+ }
+
+ public ResolvedTuple parent() { return parent; }
+
+ @Override
+ public String name() { return schema.getName(); }
+
+ @Override
+ public int nodeType() { return ID; }
+
+ public ResolvedTuple members() { return members; }
+
+ @Override
+ public void project(ResolvedTuple dest) {
+ dest.addVector(members.buildMap());
+ }
+
+ @Override
+ public MaterializedField schema() { return schema; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
new file mode 100644
index 000000000..7694b7e71
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader.NullColumnSpec;
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Projected column that serves as both a resolved column (provides projection
+ * mapping) and a null column spec (provides the information needed to create
+ * the required null vectors.)
+ */
+
+public class ResolvedNullColumn extends ResolvedColumn implements NullColumnSpec {
+
+ public static final int ID = 4;
+
+ private final String name;
+ private MajorType type;
+
+ public ResolvedNullColumn(String name, MajorType type, VectorSource source, int sourceIndex) {
+ super(source, sourceIndex);
+ this.name = name;
+ this.type = type;
+ }
+
+ @Override
+ public String name() { return name; }
+
+ @Override
+ public MajorType type() { return type; }
+
+ @Override
+ public int nodeType() { return ID; }
+
+ @Override
+ public void setType(MajorType type) {
+
+ // Update the actual type based on what the null-column
+ // mechanism chose for this column.
+
+ this.type = type;
+ }
+
+ @Override
+ public MaterializedField schema() {
+ return MaterializedField.create(name, type);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java
new file mode 100644
index 000000000..f17802a6c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.record.MaterializedField;
+
+/**
+ * Column that matches one provided by the table. Provides the data type
+ * of that column and information to project from the result set loader
+ * output container to the scan output container. (Note that the result
+ * set loader container is, itself, a projection from the actual table
+ * schema to the desired set of columns; but in the order specified
+ * by the table.)
+ */
+
+public class ResolvedTableColumn extends ResolvedColumn {
+
+ public static final int ID = 3;
+
+ public final String projectedName;
+ public final MaterializedField schema;
+
+ public ResolvedTableColumn(String projectedName,
+ MaterializedField schema,
+ VectorSource source, int sourceIndex) {
+ super(source, sourceIndex);
+ this.projectedName = projectedName;
+ this.schema = schema;
+ }
+
+ @Override
+ public String name() { return projectedName; }
+
+ @Override
+ public MaterializedField schema() { return schema; }
+
+ @Override
+ public int nodeType() { return ID; }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" name=")
+ .append(name())
+ .append("]");
+ return buf.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
new file mode 100644
index 000000000..c6e15106b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Drill rows are made up of a tree of tuples, with the row being the root
+ * tuple. Each tuple contains columns, some of which may be maps. This
+ * class represents each row or map in the output projection.
+ * <p>
+ * Output columns within the tuple can be projected from the data source,
+ * might be null (requested columns that don't match a data source column)
+ * or might be a constant (such as an implicit column.) This class
+ * orchestrates assembling an output tuple from a collection of these
+ * three column types. (Though implicit columns appear only in the root
+ * tuple.)
+ *
+ * <h4>Null Handling</h4>
+ *
+ * The project list might reference a "missing" map if the project list
+ * includes, say, <tt>SELECT a.b.c</tt> but <tt>`a`</tt> does not exist
+ * in the data source. In this case, the column a is implied to be a map,
+ * so the projection mechanism will create a null map for <tt>`a`</tt>
+ * and <tt>`b`</tt>, and will create a null column for <tt>`c`</tt>.
+ * <p>
+ * To accomplish this recursive null processing, each tuple is associated
+ * with a null builder. (The null builder can be null if projection is
+ * implicit with a wildcard; in such a case no null columns can occur.
+ * But, even here, with schema persistence, a <tt>SELECT *</tt> query
+ * may need null columns if a second file does not contain a column
+ * that appeared in a first file.)
+ * <p>
+ * The null builder is bound to each tuple to allow vector persistence
+ * via the result vector cache. If we must create a null column
+ * <tt>`x`</tt> in two different readers, then the rules of Drill
+ * require that the same vector be used for both (or else a schema
+ * change is signaled.) The vector cache works by name (and type).
+ * Since maps may contain columns with the same names as other maps,
+ * the vector cache must be associated with each tuple. And, by extension,
+ * the null builder must also be associated with each tuple.
+ *
+ * <h4>Lifecycle</h4>
+ *
+ * The lifecycle of a resolved tuple is:
+ * <ul>
+ * <li>The projection mechanism creates the output tuple, and its columns,
+ * by comparing the project list against the table schema. The result is
+ * a set of table, null, or constant columns.</li>
+ * <li>Once per schema change, the resolved tuple creates the output
+ * tuple by linking to vectors in their original locations. As it turns out,
+ * we can simply share the vectors; we don't need to transfer the buffers.</li>
+ * <li>To prepare for the transfer, the tuple asks the null column builder
+ * (if present) to build the required null columns.</li>
+ * <li>Once the output tuple is built, it can be used for any number of
+ * batches without further work. (The same vectors appear in the various inputs
+ * and the output, eliminating the need for any transfers.)</li>
+ * <li>Once per batch, the client must set the row count. This is needed for the
+ * output container, and for any "null" maps that the project may have created.</li>
+ * </ul>
+ *
+ * <h4>Projection Mapping</h4>
+ *
+ * Each column is is mapped into the output tuple (vector container or map) in
+ * the order that the columns are defined here. (That order follows the project
+ * list for explicit projection, or the table schema for implicit projection.)
+ * The source, however, may be in any order (at least for the table schema.)
+ * A projection mechanism identifies the {@link VectorSource} that supplies the
+ * vector for the column, along with the vector's index within that source.
+ * The resolved tuple is bound to an output tuple. The projection mechanism
+ * grabs the input vector from the vector source at the indicated position, and
+ * links it into the output tuple, represented by this projected tuple, at the
+ * position of the resolved column in the child list.
+ *
+ * <h4>Caveats</h4>
+ *
+ * The project mechanism handles nested "missing" columns as mentioned
+ * above. This works to create null columns within maps that are defined by the
+ * data source. However, the mechanism does not currently handle creating null
+ * columns within repeated maps or lists. Doing so is possible, but requires
+ * adding a level of cardinality computation to create the proper number of
+ * "inner" values.
+ */
+
+public abstract class ResolvedTuple implements VectorSource {
+
+ /**
+ * Represents the top-level tuple which is projected to a
+ * vector container.
+ */
+
+ public static class ResolvedRow extends ResolvedTuple {
+
+ private VectorContainer input;
+ private VectorContainer output;
+
+ public ResolvedRow(NullColumnBuilder nullBuilder) {
+ super(nullBuilder);
+ }
+
+ public void project(VectorContainer input, VectorContainer output) {
+ this.input = input;
+ this.output = output;
+ output.removeAll();
+ buildColumns();
+ output.buildSchema(SelectionVectorMode.NONE);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return input.getValueVector(index).getValueVector();
+ }
+
+ @Override
+ public void addVector(ValueVector vector) {
+ output.add(vector);
+ }
+
+ @Override
+ public void setRowCount(int rowCount) {
+ output.setRecordCount(rowCount);
+ cascadeRowCount(rowCount);
+ }
+
+ @Override
+ public BufferAllocator allocator() {
+ return output.getAllocator();
+ }
+
+ @Override
+ public String name() {
+
+ // Never used in production, just for debugging.
+
+ return "$root$";
+ }
+
+ public VectorContainer output() { return output; }
+
+ @Override
+ public int innerCardinality(int rowCount) { return rowCount; }
+ }
+
+ /**
+ * Represents a map implied by the project list, whether or not the map
+ * actually appears in the table schema.
+ * The column is implied to be a map because it contains
+ * children. This implementation builds the map and its children.
+ */
+
+ public static abstract class ResolvedMap extends ResolvedTuple {
+
+ protected final ResolvedMapColumn parentColumn;
+ protected AbstractMapVector inputMap;
+ protected AbstractMapVector outputMap;
+
+ public ResolvedMap(ResolvedMapColumn parentColumn) {
+ super(parentColumn.parent().nullBuilder == null ? null : parentColumn.parent().nullBuilder.newChild());
+ this.parentColumn = parentColumn;
+ }
+
+ @Override
+ public void addVector(ValueVector vector) {
+ outputMap.putChild(vector.getField().getName(), vector);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ assert inputMap != null;
+ return inputMap.getChildByOrdinal(index);
+ }
+
+ public AbstractMapVector buildMap() {
+ if (parentColumn.sourceIndex() != -1) {
+ final ResolvedTuple parentTuple = parentColumn.parent();
+ inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex());
+ }
+ final MaterializedField colSchema = parentColumn.schema();
+ outputMap = createMap(inputMap,
+ MaterializedField.create(
+ colSchema.getName(), colSchema.getType()),
+ parentColumn.parent().allocator());
+ buildColumns();
+ return outputMap;
+ }
+
+ protected abstract AbstractMapVector createMap(AbstractMapVector inputMap,
+ MaterializedField create, BufferAllocator allocator);
+
+ @Override
+ public BufferAllocator allocator() {
+ return outputMap.getAllocator();
+ }
+
+ @Override
+ public String name() { return parentColumn.name(); }
+ }
+
+ public static class ResolvedSingleMap extends ResolvedMap {
+
+ public ResolvedSingleMap(ResolvedMapColumn parentColumn) {
+ super(parentColumn);
+ }
+
+ @Override
+ protected AbstractMapVector createMap(AbstractMapVector inputMap,
+ MaterializedField schema, BufferAllocator allocator) {
+ return new MapVector(schema,
+ allocator, null);
+ }
+
+ @Override
+ public void setRowCount(int rowCount) {
+ ((MapVector) outputMap).setMapValueCount(rowCount);
+ cascadeRowCount(rowCount);
+ }
+
+ @Override
+ public int innerCardinality(int outerCardinality) {
+ return outerCardinality;
+ }
+ }
+
+ /**
+ * Represents a map tuple (not the map column, rather the value of the
+ * map column.) When projecting, we create a new repeated map vector,
+ * but share the offsets vector from input to output. The size of the
+ * offset vector reveals the number of elements in the "inner" array,
+ * which is the number of null values to create if null columns are
+ * added.
+ */
+
+ public static class ResolvedMapArray extends ResolvedMap {
+
+ private int valueCount;
+
+ public ResolvedMapArray(ResolvedMapColumn parentColumn) {
+ super(parentColumn);
+ }
+
+ @Override
+ protected AbstractMapVector createMap(AbstractMapVector inputMap,
+ MaterializedField schema, BufferAllocator allocator) {
+
+ // Create a new map array, reusing the offset vector from
+ // the original input map.
+
+ final RepeatedMapVector source = (RepeatedMapVector) inputMap;
+ final UInt4Vector offsets = source.getOffsetVector();
+ valueCount = offsets.getAccessor().getValueCount();
+ return new RepeatedMapVector(schema,
+ offsets, null);
+ }
+
+ @Override
+ public int innerCardinality(int outerCardinality) {
+ return valueCount;
+ }
+
+ @Override
+ public void setRowCount(int rowCount) {
+ cascadeRowCount(valueCount);
+ }
+ }
+
+ protected final List<ResolvedColumn> members = new ArrayList<>();
+ protected final NullColumnBuilder nullBuilder;
+ protected List<ResolvedTuple> children;
+ protected VectorSource binding;
+
+ public ResolvedTuple(NullColumnBuilder nullBuilder) {
+ this.nullBuilder = nullBuilder;
+ }
+
+ public NullColumnBuilder nullBuilder() {
+ return nullBuilder;
+ }
+
+ public void add(ResolvedColumn col) {
+ members.add(col);
+ }
+
+ public void addChild(ResolvedTuple child) {
+ if (children == null) {
+ children = new ArrayList<>();
+ }
+ children.add(child);
+ }
+
+ public void removeChild(ResolvedTuple child) {
+ assert ! children.isEmpty() && children.get(children.size()-1) == child;
+ children.remove(children.size()-1);
+ }
+
+ public boolean isSimpleProjection() {
+ if (children != null && ! children.isEmpty()) {
+ return false;
+ }
+ for (int i = 0; i < members.size(); i++) {
+ if (members.get(i).nodeType() == ResolvedNullColumn.ID) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @VisibleForTesting
+ public List<ResolvedColumn> columns() { return members; }
+
+ public void buildNulls(ResultVectorCache vectorCache) {
+ if (nullBuilder != null) {
+ nullBuilder.build(vectorCache);
+ }
+ if (children != null) {
+ for (final ResolvedTuple child : children) {
+ child.buildNulls(vectorCache.childCache(child.name()));
+ }
+ }
+ }
+
+ public void loadNulls(int rowCount) {
+ if (nullBuilder != null) {
+ nullBuilder.load(rowCount);
+ }
+ if (children != null) {
+ for (final ResolvedTuple child : children) {
+ child.loadNulls(innerCardinality(rowCount));
+ }
+ }
+ }
+
+ public abstract int innerCardinality(int outerCardinality);
+
+ /**
+ * Merge two or more <i>partial batches</i> to produce a final output batch.
+ * A partial batch is a vertical slice of a batch, such as the set of null
+ * columns or the set of data columns.
+ * <p>
+ * For example, consider
+ * two partial batches:<pre><code>
+ * (a, d, e)
+ * (c, b)</code></pre>
+ * We may wish to merge them by projecting columns into an output batch
+ * of the form:<pre><code>
+ * (a, b, c, d)</code></pre>
+ * It is not necessary to project all columns from the inputs, but all
+ * columns in the output must have a projection.
+ * <p>
+ * The merger is created once per schema, then can be reused for any
+ * number of batches. The only restriction is that the partial batches must
+ * have the same row count so that the final output batch record
+ * count makes sense.
+ * <p>
+ * Merging is done by discarding any data in the output, then exchanging
+ * the buffers from the input columns to the output, leaving projected
+ * columns empty. Note that unprojected columns must be cleared by the
+ * caller. The caller will have figured out which columns to project and
+ * which not to project.
+ */
+
+ protected void buildColumns() {
+ for (int i = 0; i < members.size(); i++) {
+ members.get(i).project(this);
+ }
+ }
+
+ public abstract void addVector(ValueVector vector);
+
+ public abstract void setRowCount(int rowCount);
+
+ protected void cascadeRowCount(int rowCount) {
+ if (children == null) {
+ return;
+ }
+ for (final ResolvedTuple child : children) {
+ child.setRowCount(rowCount);
+ }
+ }
+
+ public abstract BufferAllocator allocator();
+
+ public abstract String name();
+
+ /**
+ * During planning, discard a partial plan to allow reusing the same (root) tuple
+ * for multiple projection plans.
+ */
+
+ public void reset() {
+ members.clear();
+ children = null;
+ }
+
+ public void close() {
+ if (nullBuilder != null) {
+ nullBuilder.close();
+ }
+ if (children != null) {
+ for (final ResolvedTuple child : children) {
+ child.close();
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
new file mode 100644
index 000000000..e20724521
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl;
+
+/**
+ * Parses and analyzes the projection list passed to the scanner. The
+ * projection list is per scan, independent of any tables that the
+ * scanner might scan. The projection list is then used as input to the
+ * per-table projection planning.
+ * <p>
+ * In most query engines, this kind of projection analysis is done at
+ * plan time. But, since Drill is schema-on-read, we don't know the
+ * available columns, or their types, until we start scanning a table.
+ * The table may provide the schema up-front, or may discover it as
+ * the read proceeds. Hence, the job here is to make sense of the
+ * project list based on static a-priori information, then to create
+ * a list that can be further resolved against an table schema when it
+ * appears. This give us two steps:
+ * <ul>
+ * <li>Scan-level projection: this class, that handles schema for the
+ * entire scan operator.</li>
+ * <li>Table-level projection: defined elsewhere, that merges the
+ * table and scan-level projections.
+ * </ul>
+ * <p>
+ * Accepts the inputs needed to
+ * plan a projection, builds the mappings, and constructs the projection
+ * mapping object.
+ * <p>
+ * Builds the per-scan projection plan given a set of projected columns.
+ * Determines the output schema, which columns to project from the data
+ * source, which are metadata, and so on.
+ * <p>
+ * An annoying aspect of SQL is that the projection list (the list of
+ * columns to appear in the output) is specified after the SELECT keyword.
+ * In Relational theory, projection is about columns, selection is about
+ * rows...
+ * <p>
+ * Mappings can be based on three primary use cases:
+ * <ul>
+ * <li><tt>SELECT *</tt>: Project all data source columns, whatever they happen
+ * to be. Create columns using names from the data source. The data source
+ * also determines the order of columns within the row.</li>
+ * <li><tt>SELECT columns</tt>: Similar to SELECT * in that it projects all columns
+ * from the data source, in data source order. But, rather than creating
+ * individual output columns for each data source column, creates a single
+ * column which is an array of Varchars which holds the (text form) of
+ * each column as an array element.</li>
+ * <li><tt>SELECT a, b, c, ...</tt>: Project a specific set of columns, identified by
+ * case-insensitive name. The output row uses the names from the SELECT list,
+ * but types from the data source. Columns appear in the row in the order
+ * specified by the SELECT.</li>
+ * <li<tt>SELECT ...</tt>: SELECT nothing, occurs in <tt>SELECT COUNT(*)</tt>
+ * type queries. The provided projection list contains no (table) columns, though
+ * it may contain metadata columns.</li>
+ * </ul>
+ * Names in the SELECT list can reference any of five distinct types of output
+ * columns:
+ * <ul>
+ * <li>Wildcard ("*") column: indicates the place in the projection list to insert
+ * the table columns once found in the table projection plan.</li>
+ * <li>Data source columns: columns from the underlying table. The table
+ * projection planner will determine if the column exists, or must be filled
+ * in with a null column.</li>
+ * <li>The generic data source columns array: <tt>columns</tt>, or optionally
+ * specific members of the <tt>columns</tt> array such as <tt>columns[1]</tt>.</li>
+ * <li>Implicit columns: <tt>fqn</tt>, <tt>filename</tt>, <tt>filepath</tt>
+ * and <tt>suffix</tt>. These reference
+ * parts of the name of the file being scanned.</li>
+ * <li>Partition columns: <tt>dir0</tt>, <tt>dir1</tt>, ...: These reference
+ * parts of the path name of the file.</li>
+ * </ul>
+ *
+ * @see {@link ImplicitColumnExplorer}, the class from which this class
+ * evolved
+ */
+
+public class ScanLevelProjection {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class);
+
+ /**
+ * Interface for add-on parsers, avoids the need to create
+ * a single, tightly-coupled parser for all types of columns.
+ * The main parser handles wildcards and assumes the rest of
+ * the columns are table columns. The add-on parser can tag
+ * columns as special, such as to hold metadata.
+ */
+
+ public interface ScanProjectionParser {
+ void bind(ScanLevelProjection builder);
+ boolean parse(RequestedColumn inCol);
+ void validate();
+ void validateColumn(ColumnProjection col);
+ void build();
+ }
+
+ // Input
+
+ protected final List<SchemaPath> projectionList;
+
+ // Configuration
+
+ protected List<ScanProjectionParser> parsers;
+ private final boolean v1_12MetadataLocation;
+
+ // Internal state
+
+ protected boolean sawWildcard;
+
+ // Output
+
+ protected List<ColumnProjection> outputCols = new ArrayList<>();
+ protected RequestedTuple outputProjection;
+ protected boolean hasWildcard;
+ protected boolean emptyProjection = true;
+
+ /**
+ * Specify the set of columns in the SELECT list. Since the column list
+ * comes from the query planner, assumes that the planner has checked
+ * the list for syntax and uniqueness.
+ *
+ * @param queryCols list of columns in the SELECT list in SELECT list order
+ * @return this builder
+ */
+ public ScanLevelProjection(List<SchemaPath> projectionList,
+ List<ScanProjectionParser> parsers,
+ boolean v1_12MetadataLocation) {
+ this.projectionList = projectionList;
+ this.parsers = parsers;
+ this.v1_12MetadataLocation = v1_12MetadataLocation;
+ doParse();
+ }
+
+ private void doParse() {
+ outputProjection = RequestedTupleImpl.parse(projectionList);
+
+ for (ScanProjectionParser parser : parsers) {
+ parser.bind(this);
+ }
+ for (RequestedColumn inCol : outputProjection.projections()) {
+ if (inCol.isWildcard()) {
+ mapWildcard(inCol);
+ } else {
+ mapColumn(inCol);
+ }
+ }
+ verify();
+ for (ScanProjectionParser parser : parsers) {
+ parser.build();
+ }
+ }
+
+ public ScanLevelProjection(List<SchemaPath> projectionList,
+ List<ScanProjectionParser> parsers) {
+ this(projectionList, parsers, false);
+ }
+
+ /**
+ * Wildcard is special: add it, then let parsers add any custom
+ * columns that are needed. The order is important: we want custom
+ * columns to follow table columns.
+ */
+
+ private void mapWildcard(RequestedColumn inCol) {
+
+ // Wildcard column: this is a SELECT * query.
+
+ if (sawWildcard) {
+ throw new IllegalArgumentException("Duplicate * entry in project list");
+ }
+
+ // Remember the wildcard position, if we need to insert it.
+ // Ensures that the main wildcard expansion occurs before add-on
+ // columns.
+
+ int wildcardPosn = outputCols.size();
+
+ // Parsers can consume the wildcard. But, all parsers must
+ // have visibility to the wildcard column.
+
+ for (ScanProjectionParser parser : parsers) {
+ if (parser.parse(inCol)) {
+ wildcardPosn = -1;
+ }
+ }
+
+ // Set this flag only after the parser checks.
+
+ sawWildcard = true;
+
+ // If not consumed, put the wildcard column into the projection list as a
+ // placeholder to be filled in later with actual table columns.
+
+ if (wildcardPosn != -1) {
+
+ // Drill 1.1 - 1.11 and Drill 1.13 or later put metadata columns after
+ // data columns. Drill 1.12 moved them before data columns. For testing
+ // and compatibility, the client can request to use the Drill 1.12 position,
+ // though the after-data position is the default.
+ //
+ // Note that the after-data location is much more convenient for the dirx
+ // partition columns since these vary in number across scans within the same query.
+ // By putting them at the end, the index of all other columns remains
+ // constant. Drill 1.12 broke that behavior, but Drill 1.13 restored it.
+ //
+ // This option can be removed in Drill 1.14 after things settle down.
+
+ UnresolvedColumn wildcardCol = new UnresolvedColumn(inCol, UnresolvedColumn.WILDCARD);
+ if (v1_12MetadataLocation) {
+ outputCols.add(wildcardCol);
+ } else {
+ outputCols.add(wildcardPosn, wildcardCol);
+ }
+ hasWildcard = true;
+ emptyProjection = false;
+ }
+ }
+
+ /**
+ * Map the column into one of five categories.
+ * <ol>
+ * <li>Star column (to designate SELECT *)</li>
+ * <li>Partition file column (dir0, dir1, etc.)</li>
+ * <li>Implicit column (fqn, filepath, filename, suffix)</li>
+ * <li>Special <tt>columns</tt> column which holds all columns as
+ * an array.</li>
+ * <li>Table column. The actual match against the table schema
+ * is done later.</li>
+ * </ol>
+ *
+ * Actual mapping is done by parser extensions for all but the
+ * basic cases.
+ *
+ * @param inCol the SELECT column
+ */
+
+ private void mapColumn(RequestedColumn inCol) {
+
+ // Give the extensions first crack at each column.
+ // Some may want to "sniff" a column, even if they
+ // don't fully handle it.
+
+ for (ScanProjectionParser parser : parsers) {
+ if (parser.parse(inCol)) {
+ return;
+ }
+ }
+
+ // This is a desired table column.
+
+ addTableColumn(
+ new UnresolvedColumn(inCol, UnresolvedColumn.UNRESOLVED));
+ }
+
+ public void addTableColumn(ColumnProjection outCol) {
+ outputCols.add(outCol);
+ emptyProjection = false;
+ }
+
+ public void addMetadataColumn(ColumnProjection outCol) {
+ outputCols.add(outCol);
+ }
+
+ /**
+ * Once all columns are identified, perform a final pass
+ * over the set of columns to do overall validation. Each
+ * add-on parser is given an opportunity to do its own
+ * validation.
+ */
+
+ private void verify() {
+
+ // Let parsers do overall validation.
+
+ for (ScanProjectionParser parser : parsers) {
+ parser.validate();
+ }
+
+ // Validate column-by-column.
+
+ for (ColumnProjection outCol : outputCols) {
+ for (ScanProjectionParser parser : parsers) {
+ parser.validateColumn(outCol);
+ }
+ switch (outCol.nodeType()) {
+ case UnresolvedColumn.UNRESOLVED:
+ if (hasWildcard()) {
+ throw new IllegalArgumentException("Cannot select table columns and * together");
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ /**
+ * Return the set of columns from the SELECT list
+ * @return the SELECT list columns, in SELECT list order
+ */
+
+ public List<SchemaPath> requestedCols() { return projectionList; }
+
+ /**
+ * The entire set of output columns, in output order. Output order is
+ * that specified in the SELECT (for an explicit list of columns) or
+ * table order (for SELECT * queries).
+ * @return the set of output columns in output order
+ */
+
+ public List<ColumnProjection> columns() { return outputCols; }
+
+ public boolean hasWildcard() { return hasWildcard; }
+
+ /**
+ * Return whether this is a SELECT * query
+ * @return true if this is a SELECT * query
+ */
+
+ public boolean projectAll() { return hasWildcard; }
+
+ /**
+ * Returns true if the projection list is empty. This usually
+ * indicates a <tt>SELECT COUNT(*)</tt> query (though the scan
+ * operator does not have the context to know that an empty
+ * list does, in fact, imply a count-only query...)
+ *
+ * @return true if no table columns are projected, false
+ * if at least one column is projected (or the query contained
+ * the wildcard)
+ */
+
+ public boolean projectNone() { return emptyProjection; }
+
+ public RequestedTuple rootProjection() { return outputProjection; }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" projection=")
+ .append(outputCols.toString())
+ .append("]")
+ .toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
new file mode 100644
index 000000000..c2b0262fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Performs projection of a record reader, along with a set of static
+ * columns, to produce the final "public" result set (record batch)
+ * for the scan operator. Primarily solve the "vector permanence"
+ * problem: that the scan operator must present the same set of vectors
+ * to downstream operators despite the fact that the scan operator hosts
+ * a series of readers, each of which builds its own result set.
+ * <p>
+ * Provides the option to continue a schema from one batch to the next.
+ * This can reduce spurious schema changes in formats, such as JSON, with
+ * varying fields. It is not, however, a complete solution as the outcome
+ * still depends on the order of file scans and the division of files across
+ * readers.
+ * <p>
+ * Provides the option to infer the schema from the first batch. The "quick path"
+ * to obtain the schema will read one batch, then use that schema as the returned
+ * schema, returning the full batch in the next call to <tt>next()</tt>.
+ *
+ * <h4>Publishing the Final Result Set<h4>
+ *
+ * This class "publishes" a vector container that has the final, projected
+ * form of a scan. The projected schema include:
+ * <ul>
+ * <li>Columns from the reader.</li>
+ * <li>Static columns, such as implicit or partition columns.</li>
+ * <li>Null columns for items in the select list, but not found in either
+ * of the above two categories.</li>
+ * </ul>
+ * The order of columns is that set by the select list (or, by the reader for
+ * a <tt>SELECT *</tt> query.
+ *
+ * <h4>Schema Handling</h4>
+ *
+ * The mapping handles a variety of cases:
+ * <ul>
+ * <li>An early-schema table (one in which we know the schema and
+ * the schema remains constant for the whole table.</li>
+ * <li>A late schema table (one in which we discover the schema as
+ * we read the table, and where the schema can change as the read
+ * progresses.)<ul>
+ * <li>Late schema table with SELECT * (we want all columns, whatever
+ * they happen to be.)</li>
+ * <li>Late schema with explicit select list (we want only certain
+ * columns when they happen to appear in the input.)</li></ul></li>
+ * </ul>
+ *
+ * <h4>Implementation Overview</h4>
+ *
+ * Major tasks of this class include:
+ * <ul>
+ * <li>Project table columns (change position and or name).</li>
+ * <li>Insert static and null columns.</li>
+ * <li>Schema smoothing. That is, if table A produces columns (a, b), but
+ * table B produces only (a), use the type of the first table's b column for the
+ * null created for the missing b in table B.</li>
+ * <li>Vector persistence: use the same set of vectors across readers as long
+ * as the reader schema does not cause a "hard" schema change (change in type,
+ * introduction of a new column.</li>
+ * <li>Detection of schema changes (change of type, introduction of a new column
+ * for a <tt>SELECT *</tt> query, changing the projected schema, and reporting
+ * the change downstream.</li>
+ * </ul>
+ * A projection is needed to:
+ * <ul>
+ * <li>Reorder table columns</li>
+ * <li>Select a subset of table columns</li>
+ * <li>Fill in missing select columns</li>
+ * <li>Fill in implicit or partition columns</li>
+ * </ul>
+ * Creates and returns the batch merger that does the projection.
+ *
+ * <h4>Projection</h4>
+ *
+ * To visualize this, assume we have numbered table columns, lettered
+ * implicit, null or partition columns:<pre><code>
+ * [ 1 | 2 | 3 | 4 ] Table columns in table order
+ * [ A | B | C ] Static columns
+ * </code></pre>
+ * Now, we wish to project them into select order.
+ * Let's say that the SELECT clause looked like this, with "t"
+ * indicating table columns:<pre><code>
+ * SELECT t2, t3, C, B, t1, A, t2 ...
+ * </code></pre>
+ * Then the projection looks like this:<pre><code>
+ * [ 2 | 3 | C | B | 1 | A | 2 ]
+ * </code></pre>
+ * Often, not all table columns are projected. In this case, the
+ * result set loader presents the full table schema to the reader,
+ * but actually writes only the projected columns. Suppose we
+ * have:<pre><code>
+ * SELECT t3, C, B, t1,, A ...
+ * </code></pre>
+ * Then the abbreviated table schema looks like this:<pre><code>
+ * [ 1 | 3 ]</code></pre>
+ * Note that table columns retain their table ordering.
+ * The projection looks like this:<pre><code>
+ * [ 2 | C | B | 1 | A ]
+ * </code></pre>
+ * <p>
+ * The projector is created once per schema, then can be reused for any
+ * number of batches.
+ * <p>
+ * Merging is done in one of two ways, depending on the input source:
+ * <ul>
+ * <li>For the table loader, the merger discards any data in the output,
+ * then exchanges the buffers from the input columns to the output,
+ * leaving projected columns empty. Note that unprojected columns must
+ * be cleared by the caller.</li>
+ * <li>For implicit and null columns, the output vector is identical
+ * to the input vector.</li>
+ */
+
+public class ScanSchemaOrchestrator {
+
+ public static final int MIN_BATCH_BYTE_SIZE = 256 * 1024;
+ public static final int MAX_BATCH_BYTE_SIZE = Integer.MAX_VALUE;
+ public static final int DEFAULT_BATCH_ROW_COUNT = 4096;
+ public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE;
+ public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT;
+
+ /**
+ * Orchestrates projection tasks for a single reader with the set that the
+ * scan operator manages. Vectors are reused across readers, but via a vector
+ * cache. All other state is distinct between readers.
+ */
+
+ public class ReaderSchemaOrchestrator implements VectorSource {
+
+ private int readerBatchSize;
+ private ResultSetLoaderImpl tableLoader;
+ private int prevTableSchemaVersion = -1;
+
+ /**
+ * Assembles the table, metadata and null columns into the final output
+ * batch to be sent downstream. The key goal of this class is to "smooth"
+ * schema changes in this output batch by absorbing trivial schema changes
+ * that occur across readers.
+ */
+
+ private ResolvedRow rootTuple;
+ private VectorContainer tableContainer;
+
+ public ReaderSchemaOrchestrator() {
+ readerBatchSize = scanBatchRecordLimit;
+ }
+
+ public void setBatchSize(int size) {
+ if (size > 0) {
+ readerBatchSize = Math.min(size, scanBatchRecordLimit);
+ }
+ }
+
+ public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) {
+ OptionBuilder options = new OptionBuilder();
+ options.setRowCountLimit(readerBatchSize);
+ options.setVectorCache(vectorCache);
+ options.setBatchSizeLimit(scanBatchByteLimit);
+
+ // Set up a selection list if available and is a subset of
+ // table columns. (Only needed for non-wildcard queries.)
+ // The projection list includes all candidate table columns
+ // whether or not they exist in the up-front schema. Handles
+ // the odd case where the reader claims a fixed schema, but
+ // adds a column later.
+
+ if (! scanProj.projectAll()) {
+ options.setProjectionSet(scanProj.rootProjection());
+ }
+ options.setSchema(tableSchema);
+
+ // Create the table loader
+
+ tableLoader = new ResultSetLoaderImpl(allocator, options.build());
+
+ // If a schema is given, create a zero-row batch to announce the
+ // schema downstream in the form of an empty batch.
+
+ if (tableSchema != null) {
+ tableLoader.startEmptyBatch();
+ endBatch();
+ }
+
+ return tableLoader;
+ }
+
+ public boolean hasSchema() {
+ return prevTableSchemaVersion >= 0;
+ }
+
+ public void startBatch() {
+ tableLoader.startBatch();
+ }
+
+ /**
+ * Build the final output batch by projecting columns from the three input sources
+ * to the output batch. First, build the metadata and/or null columns for the
+ * table row count. Then, merge the sources.
+ */
+
+ public void endBatch() {
+
+ // Get the batch results in a container.
+
+ tableContainer = tableLoader.harvest();
+
+ // If the schema changed, set up the final projection based on
+ // the new (or first) schema.
+
+ if (prevTableSchemaVersion < tableLoader.schemaVersion()) {
+ reviseOutputProjection();
+ } else {
+
+ // Fill in the null and metadata columns.
+
+ populateNonDataColumns();
+ }
+ rootTuple.setRowCount(tableContainer.getRecordCount());
+ }
+
+ private void populateNonDataColumns() {
+ int rowCount = tableContainer.getRecordCount();
+ metadataManager.load(rowCount);
+ rootTuple.loadNulls(rowCount);
+ }
+
+ /**
+ * Create the list of null columns by comparing the SELECT list against the
+ * columns available in the batch schema. Create null columns for those that
+ * are missing. This is done for the first batch, and any time the schema
+ * changes. (For early-schema, the projection occurs once as the schema is set
+ * up-front and does not change.) For a SELECT *, the null column check
+ * only need be done if null columns were created when mapping from a prior
+ * schema.
+ */
+
+ private void reviseOutputProjection() {
+
+ // Do the table-schema level projection; the final matching
+ // of projected columns to available columns.
+
+ TupleMetadata tableSchema = tableLoader.harvestSchema();
+ if (schemaSmoother != null) {
+ doSmoothedProjection(tableSchema);
+ } else if (scanProj.hasWildcard()) {
+ doWildcardProjection(tableSchema);
+ } else {
+ doExplicitProjection(tableSchema);
+ }
+
+ // Combine metadata, nulls and batch data to form the final
+ // output container. Columns are created by the metadata and null
+ // loaders only in response to a batch, so create the first batch.
+
+ rootTuple.buildNulls(vectorCache);
+ metadataManager.define();
+ populateNonDataColumns();
+ rootTuple.project(tableContainer, outputContainer);
+ prevTableSchemaVersion = tableLoader.schemaVersion();
+ }
+
+ private void doSmoothedProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(
+ new NullColumnBuilder(nullType, allowRequiredNullColumns));
+ schemaSmoother.resolve(tableSchema, rootTuple);
+ }
+
+ /**
+ * Query contains a wildcard. The schema-level projection includes
+ * all columns provided by the reader.
+ */
+
+ private void doWildcardProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(null);
+ new WildcardSchemaProjection(scanProj,
+ tableSchema, rootTuple, schemaResolvers);
+ }
+
+ /**
+ * Explicit projection: include only those columns actually
+ * requested by the query, which may mean filling in null
+ * columns for projected columns that don't actually exist
+ * in the table.
+ *
+ * @param tableSchema newly arrived schema
+ */
+
+ private void doExplicitProjection(TupleMetadata tableSchema) {
+ rootTuple = new ResolvedRow(
+ new NullColumnBuilder(nullType, allowRequiredNullColumns));
+ new ExplicitSchemaProjection(scanProj,
+ tableSchema, rootTuple,
+ schemaResolvers);
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return tableContainer.getValueVector(index).getValueVector();
+ }
+
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ if (tableLoader != null) {
+ tableLoader.close();
+ tableLoader = null;
+ }
+ }
+ catch (RuntimeException e) {
+ ex = e;
+ }
+ try {
+ if (rootTuple != null) {
+ rootTuple.close();
+ rootTuple = null;
+ }
+ }
+ catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ }
+ metadataManager.endFile();
+ if (ex != null) {
+ throw ex;
+ }
+ }
+ }
+
+ // Configuration
+
+ /**
+ * Custom null type, if provided by the operator. If
+ * not set, the null type is the Drill default.
+ */
+
+ private MajorType nullType;
+
+ /**
+ * Creates the metadata (file and directory) columns, if needed.
+ */
+
+ private MetadataManager metadataManager;
+ private final BufferAllocator allocator;
+ private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT;
+ private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT;
+ private boolean v1_12MetadataLocation;
+ private final List<ScanProjectionParser> parsers = new ArrayList<>();
+
+ /**
+ * List of resolvers used to resolve projection columns for each
+ * new schema. Allows operators to introduce custom functionality
+ * as a plug-in rather than by copying code or subclassing this
+ * mechanism.
+ */
+
+ List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>();
+
+ private boolean useSchemaSmoothing;
+ private boolean allowRequiredNullColumns;
+
+ // Internal state
+
+ /**
+ * Cache used to preserve the same vectors from one output batch to the
+ * next to keep the Project operator happy (which depends on exactly the
+ * same vectors.
+ * <p>
+ * If the Project operator ever changes so that it depends on looking up
+ * vectors rather than vector instances, this cache can be deprecated.
+ */
+
+ private ResultVectorCacheImpl vectorCache;
+ private ScanLevelProjection scanProj;
+ private ReaderSchemaOrchestrator currentReader;
+ private SchemaSmoother schemaSmoother;
+
+ // Output
+
+ private VectorContainer outputContainer;
+
+ public ScanSchemaOrchestrator(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ /**
+ * Specify an optional metadata manager. Metadata is a set of constant
+ * columns with per-reader values. For file-based sources, this is usually
+ * the implicit and partition columns; but it could be other items for other
+ * data sources.
+ *
+ * @param metadataMgr the application-specific metadata manager to use
+ * for this scan
+ */
+
+ public void withMetadata(MetadataManager metadataMgr) {
+ metadataManager = metadataMgr;
+ schemaResolvers.add(metadataManager.resolver());
+ }
+
+ /**
+ * Specify a custom batch record count. This is the maximum number of records
+ * per batch for this scan. Readers can adjust this, but the adjustment is capped
+ * at the value specified here
+ *
+ * @param scanBatchSize maximum records per batch
+ */
+
+ public void setBatchRecordLimit(int batchRecordLimit) {
+ scanBatchRecordLimit = Math.max(1,
+ Math.min(batchRecordLimit, ValueVector.MAX_ROW_COUNT));
+ }
+
+ public void setBatchByteLimit(int byteLimit) {
+ scanBatchByteLimit = Math.max(MIN_BATCH_BYTE_SIZE,
+ Math.min(byteLimit, MAX_BATCH_BYTE_SIZE));
+ }
+
+ /**
+ * Specify the type to use for null columns in place of the standard
+ * nullable int. This type is used for all missing columns. (Readers
+ * that need per-column control need a different mechanism.)
+ *
+ * @param nullType
+ */
+
+ public void setNullType(MajorType nullType) {
+ this.nullType = nullType;
+ }
+
+ /**
+ * Enable schema smoothing: introduces an addition level of schema
+ * resolution each time a schema changes from a reader.
+ *
+ * @param flag true to enable schema smoothing, false to disable
+ */
+
+ public void enableSchemaSmoothing(boolean flag) {
+ useSchemaSmoothing = flag;
+ }
+
+ public void allowRequiredNullColumns(boolean flag) {
+ allowRequiredNullColumns = flag;
+ }
+
+ public void useDrill1_12MetadataPosition(boolean flag) {
+ v1_12MetadataLocation = flag;
+ }
+
+ public void build(List<SchemaPath> projection) {
+ vectorCache = new ResultVectorCacheImpl(allocator, useSchemaSmoothing);
+
+ // If no metadata manager was provided, create a mock
+ // version just to keep code simple.
+
+ if (metadataManager == null) {
+ metadataManager = new NoOpMetadataManager();
+ }
+ metadataManager.bind(vectorCache);
+
+ // Bind metadata manager parser to scan projector.
+ // A "real" (non-mock) metadata manager will provide
+ // a projection parser. Use this to tell us that this
+ // setup supports metadata.
+
+ ScanProjectionParser parser = metadataManager.projectionParser();
+ if (parser != null) {
+
+ // For compatibility with Drill 1.12, insert the file metadata
+ // parser before others so that, in a wildcard query, metadata
+ // columns appear before others (such as the `columns` column.)
+ // This is temporary and should be removed once the test framework
+ // is restored to Drill 1.11 functionality.
+
+ if (v1_12MetadataLocation) {
+ parsers.add(0, parser);
+ } else {
+ parsers.add(parser);
+ }
+ }
+
+ // Parse the projection list.
+
+ scanProj = new ScanLevelProjection(projection, parsers, v1_12MetadataLocation);
+
+ if (scanProj.hasWildcard() && useSchemaSmoothing) {
+ schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers);
+ }
+
+ // Build the output container.
+
+ outputContainer = new VectorContainer(allocator);
+ }
+
+ public void addParser(ScanProjectionParser parser) {
+ parsers.add(parser);
+ }
+
+ public void addResolver(SchemaProjectionResolver resolver) {
+ schemaResolvers.add(resolver);
+ }
+
+ public ReaderSchemaOrchestrator startReader() {
+ closeReader();
+ currentReader = new ReaderSchemaOrchestrator();
+ return currentReader;
+ }
+
+ public boolean isProjectNone() {
+ return scanProj.projectNone();
+ }
+
+ public boolean hasSchema() {
+ return currentReader != null && currentReader.hasSchema();
+ }
+
+ public VectorContainer output() {
+ return outputContainer;
+ }
+
+ public void closeReader() {
+ if (currentReader != null) {
+ currentReader.close();
+ currentReader = null;
+ }
+ }
+
+ public void close() {
+ closeReader();
+ if (outputContainer != null) {
+ outputContainer.clear();
+ outputContainer = null;
+ }
+ vectorCache.close();
+ metadataManager.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
new file mode 100644
index 000000000..c7bae278e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Computes the full output schema given a table (or batch)
+ * schema. Takes the original, unresolved output list from the projection
+ * definition, merges it with the file, directory and table schema information,
+ * and produces a partially or fully resolved output list.
+ * <p>
+ * A "resolved" projection list is a list of concrete columns: table
+ * columns, nulls, file metadata or partition metadata. An unresolved list
+ * has either table column names, but no match, or a wildcard column.
+ * <p>
+ * The idea is that the projection list moves through stages of resolution
+ * depending on which information is available. An "early schema" table
+ * provides schema information up front, and so allows fully resolving
+ * the projection list on table open. A "late schema" table allows only a
+ * partially resolved projection list, with the remainder of resolution
+ * happening on the first (or perhaps every) batch.
+ * <p>
+ * Data source (table) schema can be of two forms:
+ * <ul>
+ * <li>Early schema: the schema is known before reading data. A JDBC data
+ * source is an example, as is a CSV reader for a file with headers.</li>
+ * <li>Late schema: the schema is not known until data is read, and is
+ * discovered on the fly. Example: JSON, which declares values as maps
+ * without an up-front schema.</li>
+ * </ul>
+ * These two forms give rise to distinct ways of planning the projection.
+ * <p>
+ * The final result of the projection is a set of "output" columns: a set
+ * of columns that, taken together, defines the row (bundle of vectors) that
+ * the scan operator produces. Columns are ordered: the order specified here
+ * must match the order that columns appear in the result set loader and the
+ * vector container so that code can access columns by index as well as name.
+ *
+ * @see {@link ImplicitColumnExplorer}, the class from which this class
+ * evolved
+ */
+
+public class SchemaLevelProjection {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class);
+
+ /**
+ * Schema-level projection is customizable. Implement this interface, and
+ * add an instance to the scan orchestrator, to perform custom mappings
+ * from unresolved columns (perhaps of an extension-specified type) to
+ * final projected columns. The metadata manager, for example, implements
+ * this interface to map metadata columns.
+ */
+
+ public interface SchemaProjectionResolver {
+ void startResolution();
+ boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple,
+ TupleMetadata tableSchema);
+ }
+
+ protected final List<SchemaProjectionResolver> resolvers;
+
+ protected SchemaLevelProjection(
+ List<SchemaProjectionResolver> resolvers) {
+ if (resolvers == null) {
+ resolvers = new ArrayList<>();
+ }
+ this.resolvers = resolvers;
+ for (SchemaProjectionResolver resolver : resolvers) {
+ resolver.startResolution();
+ }
+ }
+
+ protected void resolveSpecial(ResolvedTuple rootOutputTuple, ColumnProjection col,
+ TupleMetadata tableSchema) {
+ for (SchemaProjectionResolver resolver : resolvers) {
+ if (resolver.resolveColumn(col, rootOutputTuple, tableSchema)) {
+ return;
+ }
+ }
+ throw new IllegalStateException("No resolver for column: " + col.nodeType());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
new file mode 100644
index 000000000..b15fe9a2a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Implements a "schema smoothing" algorithm.
+ * Schema persistence for the wildcard selection (i.e. SELECT *)
+ * <p>
+ * Constraints:
+ * <ul>
+ * <li>Adding columns causes a hard schema change.</li>
+ * <li>Removing columns is allowed, uses type from previous
+ * schema, as long as previous mode was nullable or repeated.</li>
+ * <li>Changing type or mode causes a hard schema change.</li>
+ * <li>Changing column order is fine; use order from previous
+ * schema.</li>
+ * </ul>
+ * This can all be boiled down to a simpler rule:
+ * <ul>
+ * <li>Schema persistence is possible if the output schema
+ * from a prior schema can be reused for the current schema.</li>
+ * <li>Else, a hard schema change occurs and a new output
+ * schema is derived from the new table schema.</li>
+ * </ul>
+ * The core idea here is to "unresolve" a fully-resolved table schema
+ * to produce a new projection list that is the equivalent of using that
+ * prior projection list in the SELECT. Then, keep that projection list only
+ * if it is compatible with the next table schema, else throw it away and
+ * start over from the actual scan projection list.
+ * <p>
+ * Algorithm:
+ * <ul>
+ * <li>If partitions are included in the wildcard, and the new
+ * file needs more than the current one, create a new schema.</li>
+ * <li>Else, treat partitions as select, fill in missing with
+ * nulls.</li>
+ * <li>From an output schema, construct a new select list
+ * specification as though the columns in the current schema were
+ * explicitly specified in the SELECT clause.</li>
+ * <li>For each new schema column, verify that the column exists
+ * in the generated SELECT clause and is of the same type.
+ * If not, create a new schema.</li>
+ * <li>Use the generated schema to plan a new projection from
+ * the new schema to the prior schema.</li>
+ * </ul>
+ */
+
+public class SchemaSmoother {
+
+ /**
+ * Exception thrown if the prior schema is not compatible with the
+ * new table schema.
+ */
+
+ @SuppressWarnings("serial")
+ public static class IncompatibleSchemaException extends Exception { }
+
+ private final ScanLevelProjection scanProj;
+ private final List<SchemaProjectionResolver> resolvers;
+ private ResolvedTuple priorSchema;
+ private int schemaVersion = 0;
+
+ public SchemaSmoother(ScanLevelProjection scanProj,
+ List<SchemaProjectionResolver> resolvers) {
+ this.scanProj = scanProj;
+ this.resolvers = resolvers;
+ }
+
+ public SchemaLevelProjection resolve(
+ TupleMetadata tableSchema,
+ ResolvedTuple outputTuple) {
+
+ // If a prior schema exists, try resolving the new table using the
+ // prior schema. If this works, use the projection. Else, start
+ // over with the scan projection.
+
+ if (priorSchema != null) {
+ try {
+ SmoothingProjection smoother = new SmoothingProjection(scanProj, tableSchema,
+ priorSchema, outputTuple, resolvers);
+ priorSchema = outputTuple;
+ return smoother;
+ } catch (IncompatibleSchemaException e) {
+ outputTuple.reset();
+ // Fall through
+ }
+ }
+
+ // Can't use the prior schema. Start over with the original scan projection.
+ // Type smoothing is provided by the vector cache; but a hard schema change
+ // will occur because either a type has changed or a new column has appeared.
+ // (Or, this is the first schema.)
+
+ SchemaLevelProjection schemaProj = new WildcardSchemaProjection(scanProj,
+ tableSchema, outputTuple, resolvers);
+ priorSchema = outputTuple;
+ schemaVersion++;
+ return schemaProj;
+ }
+
+ public int schemaVersion() { return schemaVersion; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java
new file mode 100644
index 000000000..da199b8c5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Resolve a table schema against the prior schema. This works only if the
+ * types match and if all columns in the table schema already appear in the
+ * prior schema.
+ * <p>
+ * Consider this an experimental mechanism. The hope was that, with clever
+ * techniques, we could "smooth over" some of the issues that cause schema
+ * change events in Drill. As it turned out, however, creating this mechanism
+ * revealed that it is not possible, even in theory, to handle most schema
+ * changes because of the time dimension:
+ * <ul>
+ * <li>An even in a later batch may provide information that would have
+ * caused us to make a different decision in an earlier batch. For example,
+ * we are asked for column `foo`, did not see such a column in the first
+ * batch, block or file, guessed some type, and later saw that the column
+ * was of a different type. We can't "time travel" to tell our earlier
+ * selves, nor, when we make the initial type decision, can we jump to
+ * the future to see what type we'll discover.</li>
+ * <li>Readers in this fragment may see column `foo` but readers in
+ * another fragment read files/blocks that don't have that column. The
+ * two readers cannot communicate to agree on a type.</li>
+ * </ul>
+ * <p>
+ * What this mechanism can do is make decisions based on history: when a
+ * column appears, we can adjust its type a bit to try to avoid an
+ * unnecessary change. For example, if a prior file in this scan saw
+ * `foo` as nullable Varchar, but the present file has the column as
+ * requied Varchar, we can use the more general nullable form. But,
+ * again, the "can't predict the future" bites us: we can handle a
+ * nullable-to-required column change, but not visa-versa.
+ * <p>
+ * What this mechanism will tell the careful reader is that the only
+ * general solution to the schema-change problem is to now the full
+ * schema up front: for the planner to be told the schema and to
+ * communicate that schema to all readers so that all readers agree
+ * on the final schema.
+ * <p>
+ * When that is done, the techniques shown here can be used to adjust
+ * any per-file variation of schema to match the up-front schema.
+ */
+
+public class SmoothingProjection extends SchemaLevelProjection {
+
+ protected final List<MaterializedField> rewrittenFields = new ArrayList<>();
+
+ public SmoothingProjection(ScanLevelProjection scanProj,
+ TupleMetadata tableSchema,
+ ResolvedTuple priorSchema,
+ ResolvedTuple outputTuple,
+ List<SchemaProjectionResolver> resolvers) throws IncompatibleSchemaException {
+
+ super(resolvers);
+
+ for (ResolvedColumn priorCol : priorSchema.columns()) {
+ switch (priorCol.nodeType()) {
+ case ResolvedTableColumn.ID:
+ case ResolvedNullColumn.ID:
+ // This is a regular column known to this base framework.
+ resolveColumn(outputTuple, priorCol, tableSchema);
+ break;
+ default:
+ // The column is one known to an add-on mechanism.
+ resolveSpecial(outputTuple, priorCol, tableSchema);
+ }
+ }
+
+ // Check if all table columns were matched. Since names are unique,
+ // each column can be matched at most once. If the number of matches is
+ // less than the total number of columns, then some columns were not
+ // matched and we must start over.
+
+ if (rewrittenFields.size() < tableSchema.size()) {
+ throw new IncompatibleSchemaException();
+ }
+ }
+
+ /**
+ * Resolve a prior column against the current table schema. Resolves to
+ * a table column, a null column, or throws an exception if the
+ * schemas are incompatible
+ *
+ * @param priorCol a column from the prior schema
+ * @throws IncompatibleSchemaException if the prior column exists in
+ * the current table schema, but with an incompatible type
+ */
+
+ private void resolveColumn(ResolvedTuple outputTuple,
+ ResolvedColumn priorCol, TupleMetadata tableSchema) throws IncompatibleSchemaException {
+ int tableColIndex = tableSchema.index(priorCol.name());
+ if (tableColIndex == -1) {
+ resolveNullColumn(outputTuple, priorCol);
+ return;
+ }
+ MaterializedField tableCol = tableSchema.column(tableColIndex);
+ MaterializedField priorField = priorCol.schema();
+ if (! tableCol.isPromotableTo(priorField, false)) {
+ throw new IncompatibleSchemaException();
+ }
+ outputTuple.add(
+ new ResolvedTableColumn(priorCol.name(), priorField, outputTuple, tableColIndex));
+ rewrittenFields.add(priorField);
+ }
+
+ /**
+ * A prior schema column does not exist in the present table column schema.
+ * Create a null column with the same type as the prior column, as long as
+ * the prior column was not required.
+ *
+ * @param priorCol the prior column to project to a null column
+ * @throws IncompatibleSchemaException if the prior column was required
+ * and thus cannot be null-filled
+ */
+
+ private void resolveNullColumn(ResolvedTuple outputTuple,
+ ResolvedColumn priorCol) throws IncompatibleSchemaException {
+ if (priorCol.schema().getType().getMode() == DataMode.REQUIRED) {
+ throw new IncompatibleSchemaException();
+ }
+ outputTuple.add(outputTuple.nullBuilder().add(priorCol.name(),
+ priorCol.schema().getType()));
+ }
+
+ public List<MaterializedField> revisedTableSchema() { return rewrittenFields; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java
new file mode 100644
index 000000000..bfd3614b5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder;
+import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.record.VectorContainer;
+
+/**
+ * Base class for columns that take values based on the
+ * reader, not individual rows. E.g. null columns (values
+ * are all null) or file metadata (AKA "implicit") columns
+ * that take values based on the file.
+ */
+
+public abstract class StaticColumnLoader {
+ protected final ResultSetLoader loader;
+ protected final ResultVectorCache vectorCache;
+
+ public StaticColumnLoader(ResultVectorCache vectorCache) {
+
+ ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder()
+ .setVectorCache(vectorCache)
+ .build();
+ loader = new ResultSetLoaderImpl(vectorCache.allocator(), options);
+ this.vectorCache = vectorCache;
+ }
+
+ /**
+ * Populate static vectors with the defined static values.
+ *
+ * @param rowCount number of rows to generate. Must match the
+ * row count in the batch returned by the reader
+ */
+
+ public abstract VectorContainer load(int rowCount);
+
+ public void close() {
+ loader.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java
new file mode 100644
index 000000000..2dfa9c44d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+
+/**
+ * Represents a projected column that has not yet been bound to a
+ * table column, special column or a null column. Once bound, this
+ * column projection is replaced with the detailed binding.
+ */
+public class UnresolvedColumn implements ColumnProjection {
+
+ public static final int WILDCARD = 1;
+ public static final int UNRESOLVED = 2;
+
+ /**
+ * The original physical plan column to which this output column
+ * maps. In some cases, multiple output columns map map the to the
+ * same "input" (to the projection process) column.
+ */
+
+ protected final RequestedColumn inCol;
+ private final int id;
+
+ public UnresolvedColumn(RequestedColumn inCol, int id) {
+ this.inCol = inCol;
+ this.id = id;
+ }
+
+ @Override
+ public int nodeType() { return id; }
+
+ @Override
+ public String name() { return inCol.name(); }
+
+ public RequestedColumn element() { return inCol; }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf
+ .append("[")
+ .append(getClass().getSimpleName())
+ .append(" type=")
+ .append(id == WILDCARD ? "wildcard" : "column");
+ if (inCol != null) {
+ buf
+ .append(", incol=")
+ .append(inCol.toString());
+ }
+ return buf.append("]").toString();
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java
new file mode 100644
index 000000000..625ee73de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Generic mechanism for retrieving vectors from a source tuple when
+ * projecting columns to the output tuple. Works around the fact that
+ * vector containers and maps are both tuples, but have very different
+ * interfaces. Also allows other classes to act as "proxies" for a
+ * source tuple.
+ */
+
+public interface VectorSource {
+ ValueVector vector(int index);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
new file mode 100644
index 000000000..d1cfbe171
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.List;
+
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+
+/**
+ * Perform a wildcard projection. In this case, the query wants all
+ * columns in the source table, so the table drives the final projection.
+ * Since we include only those columns in the table, there is no need
+ * to create null columns. Example: SELECT *
+ */
+
+public class WildcardSchemaProjection extends SchemaLevelProjection {
+
+ public WildcardSchemaProjection(ScanLevelProjection scanProj,
+ TupleMetadata tableSchema,
+ ResolvedTuple rootTuple,
+ List<SchemaProjectionResolver> resolvers) {
+ super(resolvers);
+ for (ColumnProjection col : scanProj.columns()) {
+ if (col.nodeType() == UnresolvedColumn.WILDCARD) {
+ projectAllColumns(rootTuple, tableSchema);
+ } else {
+ resolveSpecial(rootTuple, col, tableSchema);
+ }
+ }
+ }
+
+ /**
+ * Project all columns from table schema to the output, in table
+ * schema order. Since we accept any map columns as-is, no need
+ * to do recursive projection.
+ *
+ * @param tableSchema
+ */
+
+ private void projectAllColumns(ResolvedTuple rootTuple, TupleMetadata tableSchema) {
+ for (int i = 0; i < tableSchema.size(); i++) {
+ MaterializedField colSchema = tableSchema.column(i);
+ rootTuple.add(
+ new ResolvedTableColumn(colSchema.getName(),
+ colSchema, rootTuple, i));
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
new file mode 100644
index 000000000..224c69ad1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides run-time semantic analysis of the projection list for the
+ * scan operator. The project list can include table columns and a
+ * variety of special columns. Requested columns can exist in the table,
+ * or may be "missing" with null values applied. The code here prepares
+ * a run-time projection plan based on the actual table schema.
+ * <p>
+ * The core concept is one of successive refinement of the project
+ * list through a set of rewrites:
+ * <ul>
+ * <li>Scan-level rewrite: convert {@link SchemaPath} entries into
+ * internal column nodes, tagging the nodes with the column type:
+ * wildcard, unresolved table column, or special columns (such as
+ * file metadata.) The scan-level rewrite is done once per scan
+ * operator.</li>
+ * <li>Reader-level rewrite: convert the internal column nodes into
+ * other internal nodes, leaving table column nodes unresolved. The
+ * typical use is to fill in metadata columns with information about a
+ * specific file.</li>
+ * <li>Schema-level rewrite: given the actual schema of a record batch,
+ * rewrite the reader-level projection to describe the final projection
+ * from incoming data to output container. This step fills in missing
+ * columns, expands wildcards, etc.</li>
+ * </ul>
+ * The following outlines the steps from scan plan to per-file data
+ * loading to producing the output batch. The center path is the
+ * projection metadata which turns into an actual output batch.
+ * <pre>
+ * Scan Plan
+ * |
+ * v
+ * +--------------+
+ * | Project List |
+ * | Parser |
+ * +--------------+
+ * |
+ * v
+ * +------------+
+ * | Scan Level |
+ * | Projection | -----------+
+ * +------------+ |
+ * | |
+ * v v
+ * +------+ +------------+ +------------+ +-----------+
+ * | File | ---> | File Level | | Result Set | ---> | Data File |
+ * | Data | | Projection | | Loader | <--- | Reader |
+ * +------+ +------------+ +------------+ +-----------+
+ * | |
+ * v |
+ * +--------------+ Table |
+ * | Schema Level | Schema |
+ * | Projection | <---------+
+ * +--------------+ |
+ * | |
+ * v |
+ * +--------+ Loaded |
+ * | Output | Vectors |
+ * | Mapper | <------------+
+ * +--------+
+ * |
+ * v
+ * Output Batch
+ * </pre>
+ * <p>
+ * The output mapper includes mechanisms to populate implicit columns, create
+ * null columns, and to merge implicit, null and data columns, omitting
+ * unprojected data columns.
+ * <p>
+ * In all cases, projection must handle maps, which are a recursive structure
+ * much like a row. That is, Drill consists of nested tuples (the row and maps),
+ * each of which contains columns which can be maps. Thus, there is a set of
+ * alternating layers of tuples, columns, tuples, and so on until we get to leaf
+ * (non-map) columns. As a result, most of the above structures are in the form
+ * of tuple trees, requiring recursive algorithms to apply rules down through the
+ * nested layers of tuples.
+ * <p>
+ * The above mechanism is done at runtime, in each scan fragment. Since Drill is
+ * schema-on-read, and has no plan-time schema concept, run-time projection is
+ * required. On the other hand, if Drill were ever to support the "classic"
+ * plan-time schema resolution, then much of this work could be done at plan
+ * time rather than (redundantly) at runtime. The main change would be to do
+ * the work abstractly, working with column and row descriptions, rather than
+ * concretely with vectors as is done here. Then, that abstract description
+ * would feed directly into these mechanisms with the "final answer" about
+ * projection, batch layout, and so on. The parts of this mechanism that
+ * create and populate vectors would remain.
+ */
+
+package org.apache.drill.exec.physical.impl.scan.project; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
index 0dde5979b..3488e7bb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState;
import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState;
import org.apache.drill.exec.physical.rowSet.impl.UnionState.UnionVectorState;
import org.apache.drill.exec.physical.rowSet.project.RequestedTuple;
@@ -111,14 +111,14 @@ public class ListState extends ContainerState
private final ColumnMetadata schema;
private final ListVector vector;
- private final FixedWidthVectorState bitsVectorState;
- private final OffsetVectorState offsetVectorState;
+ private final VectorState bitsVectorState;
+ private final VectorState offsetVectorState;
private VectorState memberVectorState;
public ListVectorState(UnionWriterImpl writer, ListVector vector) {
this.schema = writer.schema();
this.vector = vector;
- bitsVectorState = new FixedWidthVectorState(writer, vector.getBitsVector());
+ bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector());
offsetVectorState = new OffsetVectorState(writer, vector.getOffsetVector(), writer.elementPosition());
memberVectorState = new NullVectorState();
}
@@ -126,7 +126,7 @@ public class ListState extends ContainerState
public ListVectorState(ListWriterImpl writer, WriterPosition elementWriter, ListVector vector) {
this.schema = writer.schema();
this.vector = vector;
- bitsVectorState = new FixedWidthVectorState(writer, vector.getBitsVector());
+ bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector());
offsetVectorState = new OffsetVectorState(writer, vector.getOffsetVector(), elementWriter);
memberVectorState = new NullVectorState();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
index f9c2bff2e..b01292074 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.physical.rowSet.impl;
-import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState;
+import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState;
import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.NullableVector;
@@ -31,15 +31,15 @@ public class NullableVectorState implements VectorState {
private final ColumnMetadata schema;
private final NullableScalarWriter writer;
private final NullableVector vector;
- private final SimpleVectorState bitsState;
- private final SimpleVectorState valuesState;
+ private final VectorState bitsState;
+ private final VectorState valuesState;
public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) {
this.schema = writer.schema();
this.vector = vector;
this.writer = (NullableScalarWriter) writer.scalar();
- bitsState = new FixedWidthVectorState(this.writer.bitsWriter(), vector.getBitsVector());
+ bitsState = new IsSetVectorState(this.writer.bitsWriter(), vector.getBitsVector());
valuesState = SimpleVectorState.vectorState(this.writer.schema(),
this.writer.baseWriter(), vector.getValuesVector());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
index cfb7130ce..e57373caa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
@@ -85,6 +85,23 @@ public abstract class SingleVectorState implements VectorState {
}
}
+ public static class IsSetVectorState extends FixedWidthVectorState {
+
+ public IsSetVectorState(WriterPosition writer, ValueVector mainVector) {
+ super(writer, mainVector);
+ }
+
+ @Override
+ public int allocateVector(ValueVector vector, int cardinality) {
+ int size = super.allocateVector(vector, cardinality);
+
+ // IsSet ("bit") vectors rely on values being initialized to zero (unset.)
+
+ ((FixedWidthVector) vector).zeroVector();
+ return size;
+ }
+ }
+
/**
* State for a scalar value vector. The vector might be for a simple (non-array)
* vector, or might be the payload part of a scalar array (repeated scalar)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
index 45a704c07..cd782c766 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java
@@ -44,13 +44,14 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace;
* <p>
* Examples:<br>
* <code>m</code><br>
- * If m turns out to be a map, project all members of m.<br>
+ * If <code>m</code> turns out to be a map, project all members of
+ * <code>m</code>.<br>
* <code>m.a</code><br>
- * Column m must be a map. Project only column a.<br>
+ * Column <code>m</code> must be a map. Project only column <code>a</code>.<br>
* <code>m, m.a</code><br>
* Tricky case. We interpret this as projecting only the "a" element of map m.
* <p>
- * The projection set is build from a list of columns, represented as
+ * The projection set is built from a list of columns, represented as
* {@link SchemaPath} objects, provided by the physical plan. The structure of
* <tt>SchemaPath</tt> is a bit awkward:
* <p>
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
new file mode 100644
index 000000000..c69357a71
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+public class ScanTestUtils {
+
+ /**
+ * Type-safe way to define a list of parsers.
+ * @param parsers as a varArgs list convenient for testing
+ * @return parsers as a Java List for input to the scan
+ * projection framework
+ */
+
+ public static List<ScanProjectionParser> parsers(ScanProjectionParser... parsers) {
+ return ImmutableList.copyOf(parsers);
+ }
+
+ public static List<SchemaProjectionResolver> resolvers(SchemaProjectionResolver... resolvers) {
+ return ImmutableList.copyOf(resolvers);
+ }
+
+ public static TupleMetadata schema(ResolvedTuple output) {
+ final TupleMetadata schema = new TupleSchema();
+ for (final ResolvedColumn col : output.columns()) {
+ MaterializedField field = col.schema();
+ if (field.getType() == null) {
+
+ // Convert from internal format of null columns (unset type)
+ // to a usable form (explicit minor type of NULL.)
+
+ field = MaterializedField.create(field.getName(),
+ Types.optional(MinorType.NULL));
+ }
+ schema.add(field);
+ }
+ return schema;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
new file mode 100644
index 000000000..c524577d9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Drill allows file metadata columns (also called "implicit" columns.)
+ * These are columns that contain a long repeated sequences of the same
+ * values. The ConstantColumnLoader builds and populates these columns.
+ */
+
+public class TestConstantColumnLoader extends SubOperatorTest {
+
+ private static class DummyColumn implements ConstantColumnSpec {
+
+ private final String name;
+ private final MaterializedField schema;
+ private final String value;
+
+ public DummyColumn(String name, MajorType type, String value) {
+ this.name = name;
+ this.schema = MaterializedField.create(name, type);
+ this.value = value;
+ }
+
+ @Override
+ public String name() { return name; }
+
+ @Override
+ public MaterializedField schema() { return schema; }
+
+ @Override
+ public String value() { return value; }
+ }
+
+ /**
+ * Test the static column loader using one column of each type.
+ * The null column is of type int, but the associated value is of
+ * type string. This is a bit odd, but works out because we detect that
+ * the string value is null and call setNull on the writer, and avoid
+ * using the actual data.
+ */
+
+ @Test
+ public void testConstantColumnLoader() {
+
+ final MajorType aType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.REQUIRED)
+ .build();
+ final MajorType bType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+
+ final List<ConstantColumnSpec> defns = new ArrayList<>();
+ defns.add(
+ new DummyColumn("a", aType, "a-value" ));
+ defns.add(
+ new DummyColumn("b", bType, "b-value" ));
+
+ final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ final ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
+
+ // Create a batch
+
+ staticLoader.load(2);
+
+ // Verify
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", aType)
+ .add("b", bType)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("a-value", "b-value")
+ .addRow("a-value", "b-value")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
+ staticLoader.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java
new file mode 100644
index 000000000..4bf5a6ad9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Test;
+
+/**
+ * Test the mechanism that handles all-null columns during projection.
+ * An all-null column is one projected in the query, but which does
+ * not actually exist in the underlying data source (or input
+ * operator.)
+ * <p>
+ * In anticipation of having type information, this mechanism
+ * can create the classic nullable Int null column, or one of
+ * any other type and mode.
+ */
+
+public class TestNullColumnLoader extends SubOperatorTest {
+
+ private ResolvedNullColumn makeNullCol(String name, MajorType nullType) {
+
+ // For this test, we don't need the projection, so just
+ // set it to null.
+
+ return new ResolvedNullColumn(name, nullType, null, 0);
+ }
+
+ private ResolvedNullColumn makeNullCol(String name) {
+ return makeNullCol(name, null);
+ }
+
+ /**
+ * Test the simplest case: default null type, nothing in the vector
+ * cache. Specify no column type, the special NULL type, or a
+ * predefined type. Output types should be set accordingly.
+ */
+
+ @Test
+ public void testBasics() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("unspecified", null));
+ defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL)));
+ defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR)));
+ defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR)));
+ defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR)));
+
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .addNullable("specifiedOpt", MinorType.VARCHAR)
+ .addNullable("specifiedReq", MinorType.VARCHAR)
+ .addArray("specifiedArray", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null, null, null, new String[] {})
+ .addRow(null, null, null, null, new String[] {})
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Test the ability to use a type other than nullable INT for null
+ * columns. This occurs, for example, in the CSV reader where no
+ * column is ever INT (nullable or otherwise) and we want our null
+ * columns to be (non-nullable) VARCHAR.
+ */
+
+ @Test
+ public void testCustomNullType() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("unspecified", null));
+ defns.add(makeNullCol("nullType", MajorType.newBuilder()
+ .setMinorType(MinorType.NULL)
+ .setMode(DataMode.OPTIONAL)
+ .build()));
+
+ // Null required is an oxymoron, so is not tested.
+ // Null type array does not make sense, so is not tested.
+
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ final MajorType nullType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("unspecified", nullType)
+ .add("nullType", nullType)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null)
+ .addRow(null, null)
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Drill requires "schema persistence": if a scan operator
+ * reads two files, F1 and F2, then the scan operator must
+ * provide the same vectors from both readers. Not just the
+ * same types, the same value vector instances (but, of course,
+ * populated with different data.)
+ * <p>
+ * Test the case in which the reader for F1 found columns
+ * (a, b, c) but, F2 found only (a, b), requiring that we
+ * fill in column c, filled with nulls, but of the same type that it
+ * was in file F1. We use a vector cache to pull off this trick.
+ * This test ensures that the null column mechanism looks in that
+ * vector cache when asked to create a nullable column.
+ */
+
+ @Test
+ public void testCachedTypesMapToNullable() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("req"));
+ defns.add(makeNullCol("opt"));
+ defns.add(makeNullCol("rep"));
+ defns.add(makeNullCol("unk"));
+
+ // Populate the cache with a column of each mode.
+
+ final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
+ final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
+ final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
+
+ // Use nullable Varchar for unknown null columns.
+
+ final MajorType nullType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify vectors are reused
+
+ assertSame(opt, output.getValueVector(1).getValueVector());
+ assertSame(rep, output.getValueVector(2).getValueVector());
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .addNullable("req", MinorType.FLOAT8)
+ .addNullable("opt", MinorType.FLOAT8)
+ .addArray("rep", MinorType.FLOAT8)
+ .addNullable("unk", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null, new int[] { }, null)
+ .addRow(null, null, new int[] { }, null)
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Suppose, in the previous test, that one of the columns that
+ * goes missing is a required column. The null-column mechanism can
+ * create the "null" column as a required column, then fill it with
+ * empty values (zero or "") -- if the scan operator feels doing so would
+ * be helpful.
+ */
+
+ @Test
+ public void testCachedTypesAllowRequired() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("req"));
+ defns.add(makeNullCol("opt"));
+ defns.add(makeNullCol("rep"));
+ defns.add(makeNullCol("unk"));
+
+ // Populate the cache with a column of each mode.
+
+ final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
+ final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
+ final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
+
+ // Use nullable Varchar for unknown null columns.
+
+ final MajorType nullType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify vectors are reused
+
+ assertSame(opt, output.getValueVector(1).getValueVector());
+ assertSame(rep, output.getValueVector(2).getValueVector());
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("req", MinorType.FLOAT8)
+ .addNullable("opt", MinorType.FLOAT8)
+ .addArray("rep", MinorType.FLOAT8)
+ .addNullable("unk", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(0.0, null, new int[] { }, null)
+ .addRow(0.0, null, new int[] { }, null)
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Test the shim class that adapts between the null column loader
+ * and the projection mechanism. The projection mechanism uses this
+ * to pull in the null columns which the null column loader has
+ * created.
+ */
+
+ @Test
+ public void testNullColumnBuilder() {
+
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+ builder.add("unspecified");
+ builder.add("nullType", Types.optional(MinorType.NULL));
+ builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR));
+ builder.add("specifiedReq", Types.required(MinorType.VARCHAR));
+ builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR));
+ builder.build(cache);
+
+ // Create a batch
+
+ builder.load(2);
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .addNullable("specifiedOpt", MinorType.VARCHAR)
+ .addNullable("specifiedReq", MinorType.VARCHAR)
+ .addArray("specifiedArray", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null, null, null, new String[] {})
+ .addRow(null, null, null, null, new String[] {})
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(builder.output()));
+ builder.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
new file mode 100644
index 000000000..cb0365ca9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+
+
+/**
+ * Test the row batch merger by merging two batches. Tests both the
+ * "direct" and "exchange" cases. Direct means that the output container
+ * contains the source vector directly: they are the same vectors.
+ * Exchange means we have two vectors, but we swap the underlying
+ * Drillbufs to effectively shift data from source to destination
+ * vector.
+ */
+
+public class TestRowBatchMerger extends SubOperatorTest {
+
+ public static class RowSetSource implements VectorSource {
+
+ private SingleRowSet rowSet;
+
+ public RowSetSource(SingleRowSet rowSet) {
+ this.rowSet = rowSet;
+ }
+
+ public RowSet rowSet() { return rowSet; }
+
+ public void clear() {
+ rowSet.clear();
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return rowSet.container().getValueVector(index).getValueVector();
+ }
+ }
+
+ public static final int SLAB_SIZE = 16 * 1024 * 1024;
+
+ @BeforeClass
+ public static void setup() {
+ // Party on 10 16MB blocks of memory to detect vector issues
+ DrillBuf bufs[] = new DrillBuf[10];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = fixture.allocator().buffer(SLAB_SIZE);
+ for (int j = 0; j < SLAB_SIZE / 4; j++) {
+ bufs[i].setInt(j * 4, 0xDEADBEEF);
+ }
+ }
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i].release();
+ }
+ }
+
+ private RowSetSource makeFirst() {
+ BatchSchema firstSchema = new SchemaBuilder()
+ .add("d", MinorType.VARCHAR)
+ .add("a", MinorType.INT)
+ .build();
+ return new RowSetSource(
+ fixture.rowSetBuilder(firstSchema)
+ .addRow("barney", 10)
+ .addRow("wilma", 20)
+ .build());
+ }
+
+ private RowSetSource makeSecond() {
+ BatchSchema secondSchema = new SchemaBuilder()
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .build();
+ return new RowSetSource(
+ fixture.rowSetBuilder(secondSchema)
+ .addRow(1, "foo.csv")
+ .addRow(2, "foo.csv")
+ .build());
+ }
+
+ public static class TestProjection extends ResolvedColumn {
+
+ public TestProjection(VectorSource source, int sourceIndex) {
+ super(source, sourceIndex);
+ }
+
+ @Override
+ public String name() { return null; }
+
+ @Override
+ public int nodeType() { return -1; }
+
+ @Override
+ public MaterializedField schema() { return null; }
+ }
+
+ @Test
+ public void testSimpleFlat() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create the second batch
+
+ RowSetSource second = makeSecond();
+
+ ResolvedRow resolvedTuple = new ResolvedRow(null);
+ resolvedTuple.add(new TestProjection(first, 1));
+ resolvedTuple.add(new TestProjection(second, 0));
+ resolvedTuple.add(new TestProjection(second, 1));
+ resolvedTuple.add(new TestProjection(first, 0));
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(null, output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, 1, "foo.csv", "barney")
+ .addRow(20, 2, "foo.csv", "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testImplicitFlat() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create the second batch
+
+ RowSetSource second = makeSecond();
+
+ ResolvedRow resolvedTuple = new ResolvedRow(null);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+ resolvedTuple.add(new TestProjection(second, 0));
+ resolvedTuple.add(new TestProjection(second, 1));
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, 1, "foo.csv", "barney")
+ .addRow(20, 2, "foo.csv", "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testFlatWithNulls() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create null columns
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+ resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
+ resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ builder.build(cache);
+ builder.load(first.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .addNullable("null2", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, null, null, "barney")
+ .addRow(20, null, null, "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ builder.close();
+ }
+
+ /**
+ * Test the ability to create maps from whole cloth if requested in
+ * the projection list, and the map is not available from the data
+ * source.
+ */
+
+ @Test
+ public void testNullMaps() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create null columns
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+
+ ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
+ ResolvedTuple nullMap = nullMapCol.members();
+ nullMap.add(nullMap.nullBuilder().add("null1"));
+ nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+
+ ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
+ ResolvedTuple nullMap2 = nullMapCol2.members();
+ nullMap2.add(nullMap2.nullBuilder().add("null3"));
+ nullMap.add(nullMapCol2);
+
+ resolvedTuple.add(nullMapCol);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(first.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ resolvedTuple.setRowCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("map1")
+ .addNullable("null1", MinorType.INT)
+ .addNullable("null2", MinorType.VARCHAR)
+ .addMap("map2")
+ .addNullable("null3", MinorType.INT)
+ .resumeMap()
+ .resumeSchema()
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, mapValue(null, null, singleMap(null)), "barney")
+ .addRow(20, mapValue(null, null, singleMap(null)), "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ resolvedTuple.close();
+ }
+
+ /**
+ * Test that the merger mechanism can rewrite a map to include
+ * projected null columns.
+ */
+
+ @Test
+ public void testMapRevision() {
+
+ // Create the first batch
+
+ BatchSchema inputSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMap("a")
+ .add("c", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSetSource input = new RowSetSource(
+ fixture.rowSetBuilder(inputSchema)
+ .addRow("barney", singleMap(10))
+ .addRow("wilma", singleMap(20))
+ .build());
+
+ // Create mappings
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+ inputSchema.getColumn(1), 1);
+ resolvedTuple.add(mapCol);
+ ResolvedTuple map = mapCol.members();
+ map.add(new TestProjection(map, 0));
+ map.add(map.nullBuilder().add("null1"));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(input.rowSet().container(), output);
+ output.setRecordCount(input.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMap("a")
+ .add("c", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("barney", mapValue(10, null))
+ .addRow("wilma", mapValue(20, null))
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ /**
+ * Test that the merger mechanism can rewrite a map array to include
+ * projected null columns.
+ */
+
+ @Test
+ public void testMapArrayRevision() {
+
+ // Create the first batch
+
+ BatchSchema inputSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMapArray("a")
+ .add("c", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSetSource input = new RowSetSource(
+ fixture.rowSetBuilder(inputSchema)
+ .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
+ .addRow("wilma", mapArray(singleMap(20), singleMap(21)))
+ .build());
+
+ // Create mappings
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+ inputSchema.getColumn(1), 1);
+ resolvedTuple.add(mapCol);
+ ResolvedTuple map = mapCol.members();
+ map.add(new TestProjection(map, 0));
+ map.add(map.nullBuilder().add("null1"));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(input.rowSet().container(), output);
+ output.setRecordCount(input.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMapArray("a")
+ .add("c", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("barney", mapArray(
+ mapValue(10, null), mapValue(11, null), mapValue(12, null)))
+ .addRow("wilma", mapArray(
+ mapValue(20, null), mapValue(21, null)))
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
new file mode 100644
index 000000000..e07ad9a62
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+/**
+ * Test the level of projection done at the level of the scan as a whole;
+ * before knowledge of table "implicit" columns or the specific table schema.
+ */
+
+public class TestScanLevelProjection extends SubOperatorTest {
+
+ /**
+ * Basic test: select a set of columns (a, b, c) when the
+ * data source has an early schema of (a, c, d). (a, c) are
+ * projected, (d) is null.
+ */
+
+ @Test
+ public void testBasics() {
+
+ // Simulate SELECT a, b, c ...
+ // Build the projection plan and verify
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a", "b", "c"),
+ ScanTestUtils.parsers());
+ assertFalse(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+
+ assertEquals(3, scanProj.requestedCols().size());
+ assertEquals("a", scanProj.requestedCols().get(0).rootName());
+ assertEquals("b", scanProj.requestedCols().get(1).rootName());
+ assertEquals("c", scanProj.requestedCols().get(2).rootName());
+
+ assertEquals(3, scanProj.columns().size());
+ assertEquals("a", scanProj.columns().get(0).name());
+ assertEquals("b", scanProj.columns().get(1).name());
+ assertEquals("c", scanProj.columns().get(2).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+ }
+
+ /**
+ * Map projection occurs when a query contains project-list items with
+ * a dot, such as "a.b". We may not know the type of "b", but have
+ * just learned that "a" must be a map.
+ */
+
+ @Test
+ public void testMap() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
+ ScanTestUtils.parsers());
+ assertFalse(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+
+ assertEquals(3, scanProj.columns().size());
+ assertEquals("a", scanProj.columns().get(0).name());
+ assertEquals("b", scanProj.columns().get(1).name());
+ assertEquals("c", scanProj.columns().get(2).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+ // Map structure
+
+ final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
+ assertTrue(a.isTuple());
+ assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x"));
+ assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y"));
+ assertEquals(ProjectionType.UNPROJECTED, a.mapProjection().projectionType("z"));
+
+ final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
+ assertTrue(c.isSimple());
+ }
+
+ /**
+ * Similar to maps, if the project list contains "a[1]" then we've learned that
+ * a is an array, but we don't know what type.
+ */
+
+ @Test
+ public void testArray() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a[1]", "a[3]"),
+ ScanTestUtils.parsers());
+ assertFalse(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+
+ assertEquals(1, scanProj.columns().size());
+ assertEquals("a", scanProj.columns().get(0).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+ // Map structure
+
+ final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
+ assertTrue(a.isArray());
+ assertFalse(a.hasIndex(0));
+ assertTrue(a.hasIndex(1));
+ assertFalse(a.hasIndex(2));
+ assertTrue(a.hasIndex(3));
+ }
+
+ /**
+ * Simulate a SELECT * query by passing "*" as a column name.
+ */
+
+ @Test
+ public void testWildcard() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ assertTrue(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+ assertEquals(1, scanProj.requestedCols().size());
+ assertTrue(scanProj.requestedCols().get(0).isDynamicStar());
+
+ assertEquals(1, scanProj.columns().size());
+ assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name());
+
+ // Verify bindings
+
+ assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
+ }
+
+ /**
+ * Test an empty projection which occurs in a
+ * SELECT COUNT(*) query.
+ */
+
+ @Test
+ public void testEmptyProjection() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(),
+ ScanTestUtils.parsers());
+
+ assertFalse(scanProj.projectAll());
+ assertTrue(scanProj.projectNone());
+ assertEquals(0, scanProj.requestedCols().size());
+ }
+
+ /**
+ * Can't include both a wildcard and a column name.
+ */
+
+ @Test
+ public void testErrorWildcardAndColumns() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
+ ScanTestUtils.parsers());
+ fail();
+ } catch (final IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Can't include both a column name and a wildcard.
+ */
+
+ @Test
+ public void testErrorColumnAndWildcard() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
+ ScanTestUtils.parsers());
+ fail();
+ } catch (final IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Can't include a wildcard twice.
+ * <p>
+ * Note: Drill actually allows this, but the work should be done
+ * in the project operator; scan should see at most one wildcard.
+ */
+
+ @Test
+ public void testErrorTwoWildcards() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
+ ScanTestUtils.parsers());
+ fail();
+ } catch (final UserException e) {
+ // Expected
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java
new file mode 100644
index 000000000..839e2caa4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * "Schema level projection" describes one side of the projection
+ * mechanism. When we project, we have the set of column the user
+ * wants "the schema level" and the set of columns on offer from
+ * the data source "the scan level." The projection mechanism
+ * combines these to map out the actual projection.
+ */
+
+public class TestSchemaLevelProjection extends SubOperatorTest {
+
+ /**
+ * Test wildcard projection: take all columns on offer from
+ * the data source, in the order that the data source specifies.
+ */
+
+ @Test
+ public void testWildcard() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+ assertEquals(1, scanProj.columns().size());
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .addNullable("c", MinorType.INT)
+ .addArray("d", MinorType.FLOAT8)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new WildcardSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+
+ assertEquals("a", columns.get(0).name());
+ assertEquals(0, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+ assertEquals("c", columns.get(1).name());
+ assertEquals(1, columns.get(1).sourceIndex());
+ assertSame(rootTuple, columns.get(1).source());
+ assertEquals("d", columns.get(2).name());
+ assertEquals(2, columns.get(2).sourceIndex());
+ assertSame(rootTuple, columns.get(2).source());
+ }
+
+ /**
+ * Test SELECT list with columns defined in a order and with
+ * name case different than the early-schema table.
+ */
+
+ @Test
+ public void testFullList() {
+
+ // Simulate SELECT c, b, a ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("c", "b", "a"),
+ ScanTestUtils.parsers());
+ assertEquals(3, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a, b, c)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("B", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+
+ assertEquals("c", columns.get(0).name());
+ assertEquals(2, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+
+ assertEquals("b", columns.get(1).name());
+ assertEquals(1, columns.get(1).sourceIndex());
+ assertSame(rootTuple, columns.get(1).source());
+
+ assertEquals("a", columns.get(2).name());
+ assertEquals(0, columns.get(2).sourceIndex());
+ assertSame(rootTuple, columns.get(2).source());
+ }
+
+ /**
+ * Test SELECT list with columns missing from the table schema.
+ */
+
+ @Test
+ public void testMissing() {
+
+ // Simulate SELECT c, v, b, w ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("c", "v", "b", "w"),
+ ScanTestUtils.parsers());
+ assertEquals(4, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a, b, c)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("B", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(4, columns.size());
+ final VectorSource nullBuilder = rootTuple.nullBuilder();
+
+ assertEquals("c", columns.get(0).name());
+ assertEquals(2, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+
+ assertEquals("v", columns.get(1).name());
+ assertEquals(0, columns.get(1).sourceIndex());
+ assertSame(nullBuilder, columns.get(1).source());
+
+ assertEquals("b", columns.get(2).name());
+ assertEquals(1, columns.get(2).sourceIndex());
+ assertSame(rootTuple, columns.get(2).source());
+
+ assertEquals("w", columns.get(3).name());
+ assertEquals(1, columns.get(3).sourceIndex());
+ assertSame(nullBuilder, columns.get(3).source());
+ }
+
+ /**
+ * Test an explicit projection (providing columns) in which the
+ * names in the project lists are a different case than the data
+ * source, the order of columns differs, and we ask for a
+ * subset of data source columns.
+ */
+ @Test
+ public void testSubset() {
+
+ // Simulate SELECT c, a ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("c", "a"),
+ ScanTestUtils.parsers());
+ assertEquals(2, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a, b, c)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("B", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(2, columns.size());
+
+ assertEquals("c", columns.get(0).name());
+ assertEquals(2, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+
+ assertEquals("a", columns.get(1).name());
+ assertEquals(0, columns.get(1).sourceIndex());
+ assertSame(rootTuple, columns.get(1).source());
+ }
+
+ /**
+ * Drill is unique in that we can select (a, b) from a data source
+ * that only offers (c, d). We get null columns as a result.
+ */
+
+ @Test
+ public void testDisjoint() {
+
+ // Simulate SELECT c, a ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("b"),
+ ScanTestUtils.parsers());
+ assertEquals(1, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(1, columns.size());
+ final VectorSource nullBuilder = rootTuple.nullBuilder();
+
+ assertEquals("b", columns.get(0).name());
+ assertEquals(0, columns.get(0).sourceIndex());
+ assertSame(nullBuilder, columns.get(0).source());
+ }
+
+ /**
+ * Test the obscure case that the data source contains a map, but we
+ * project only one of the members of the map. The output should be a
+ * map that contains only the members we request.
+ */
+
+ @Test
+ public void testOmittedMap() {
+
+ // Simulate SELECT a, b.c.d ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a", "b.c.d"),
+ ScanTestUtils.parsers());
+ assertEquals(2, scanProj.columns().size());
+ {
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType());
+ final UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1));
+ assertTrue(bCol.element().isTuple());
+ }
+
+ // Simulate a data source, with early schema, of (a)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(2, columns.size());
+
+ // Should have resolved a to a table column, b to a missing map.
+
+ // A is projected
+
+ final ResolvedColumn aCol = columns.get(0);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+
+ // B is not projected, is implicitly a map
+
+ final ResolvedColumn bCol = columns.get(1);
+ assertEquals("b", bCol.name());
+ assertEquals(ResolvedMapColumn.ID, bCol.nodeType());
+
+ final ResolvedMapColumn bMap = (ResolvedMapColumn) bCol;
+ final ResolvedTuple bMembers = bMap.members();
+ assertNotNull(bMembers);
+ assertEquals(1, bMembers.columns().size());
+
+ // C is a map within b
+
+ final ResolvedColumn cCol = bMembers.columns().get(0);
+ assertEquals(ResolvedMapColumn.ID, cCol.nodeType());
+
+ final ResolvedMapColumn cMap = (ResolvedMapColumn) cCol;
+ final ResolvedTuple cMembers = cMap.members();
+ assertNotNull(cMembers);
+ assertEquals(1, cMembers.columns().size());
+
+ // D is an unknown column type (not a map)
+
+ final ResolvedColumn dCol = cMembers.columns().get(0);
+ assertEquals(ResolvedNullColumn.ID, dCol.nodeType());
+ }
+
+ /**
+ * Test of a map with missing columns.
+ * table of (a{b, c}), project a.c, a.d, a.e.f
+ */
+
+ @Test
+ public void testOmittedMapMembers() {
+
+ // Simulate SELECT a.c, a.d, a.e.f ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"),
+ ScanTestUtils.parsers());
+ assertEquals(3, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (x, y, a{b, c})
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("x", MinorType.VARCHAR)
+ .add("y", MinorType.INT)
+ .addMap("a")
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .resumeSchema()
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+
+ // Should have resolved a.b to a map column,
+ // a.d to a missing nested map, and a.e.f to a missing
+ // nested map member
+
+ // X is projected
+
+ final ResolvedColumn xCol = columns.get(0);
+ assertEquals("x", xCol.name());
+ assertEquals(ResolvedTableColumn.ID, xCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source());
+ assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex());
+
+ // Y is projected
+
+ final ResolvedColumn yCol = columns.get(2);
+ assertEquals("y", yCol.name());
+ assertEquals(ResolvedTableColumn.ID, yCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source());
+ assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex());
+
+ // A is projected
+
+ final ResolvedColumn aCol = columns.get(1);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedMapColumn.ID, aCol.nodeType());
+
+ final ResolvedMapColumn aMap = (ResolvedMapColumn) aCol;
+ final ResolvedTuple aMembers = aMap.members();
+ assertFalse(aMembers.isSimpleProjection());
+ assertNotNull(aMembers);
+ assertEquals(3, aMembers.columns().size());
+
+ // a.c is projected
+
+ final ResolvedColumn acCol = aMembers.columns().get(0);
+ assertEquals("c", acCol.name());
+ assertEquals(ResolvedTableColumn.ID, acCol.nodeType());
+ assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex());
+
+ // a.d is not in the table, is null
+
+ final ResolvedColumn adCol = aMembers.columns().get(1);
+ assertEquals("d", adCol.name());
+ assertEquals(ResolvedNullColumn.ID, adCol.nodeType());
+
+ // a.e is not in the table, is implicitly a map
+
+ final ResolvedColumn aeCol = aMembers.columns().get(2);
+ assertEquals("e", aeCol.name());
+ assertEquals(ResolvedMapColumn.ID, aeCol.nodeType());
+
+ final ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol;
+ final ResolvedTuple aeMembers = aeMap.members();
+ assertFalse(aeMembers.isSimpleProjection());
+ assertNotNull(aeMembers);
+ assertEquals(1, aeMembers.columns().size());
+
+ // a.d.f is a null column
+
+ final ResolvedColumn aefCol = aeMembers.columns().get(0);
+ assertEquals("f", aefCol.name());
+ assertEquals(ResolvedNullColumn.ID, aefCol.nodeType());
+ }
+
+ /**
+ * Simple map project. This is an internal case in which the
+ * query asks for a set of columns inside a map, and the table
+ * loader produces exactly that set. No special projection is
+ * needed, the map is projected as a whole.
+ */
+
+ @Test
+ public void testSimpleMapProject() {
+
+ // Simulate SELECT a.b, a.c ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a.b", "a.c"),
+ ScanTestUtils.parsers());
+ assertEquals(1, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a{b, c})
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .addMap("a")
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .resumeSchema()
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(1, columns.size());
+
+ // Should have resolved a.b to a map column,
+ // a.d to a missing nested map, and a.e.f to a missing
+ // nested map member
+
+ // a is projected as a vector, not as a structured map
+
+ final ResolvedColumn aCol = columns.get(0);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
+ assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
+ }
+
+ /**
+ * Project of a non-map as a map
+ */
+
+ @Test
+ public void testMapMismatch() {
+
+ // Simulate SELECT a.b ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a.b"),
+ ScanTestUtils.parsers());
+
+ // Simulate a data source, with early schema, of (a)
+ // where a is not a map.
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ try {
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final UserException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Test project of an array. At the scan level, we just verify
+ * that the requested column is, indeed, an array.
+ */
+
+ @Test
+ public void testArrayProject() {
+
+ // Simulate SELECT a[0] ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a[0]"),
+ ScanTestUtils.parsers());
+
+ // Simulate a data source, with early schema, of (a)
+ // where a is not an array.
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .addArray("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(1, columns.size());
+
+ final ResolvedColumn aCol = columns.get(0);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
+ assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
+ }
+
+ /**
+ * Project of a non-array as an array
+ */
+
+ @Test
+ public void testArrayMismatch() {
+
+ // Simulate SELECT a[0] ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a[0]"),
+ ScanTestUtils.parsers());
+
+ // Simulate a data source, with early schema, of (a)
+ // where a is not an array.
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ try {
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final UserException e) {
+ // Expected
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
new file mode 100644
index 000000000..977cd216b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
+import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection;
+import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Tests schema smoothing at the schema projection level.
+ * This level handles reusing prior types when filling null
+ * values. But, because no actual vectors are involved, it
+ * does not handle the schema chosen for a table ahead of
+ * time, only the schema as it is merged with prior schema to
+ * detect missing columns.
+ * <p>
+ * Focuses on the <tt>SmoothingProjection</tt> class itself.
+ * <p>
+ * Note that, at present, schema smoothing does not work for entire
+ * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt>
+ * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does
+ * not currently know how to recreate the map. The same is true of
+ * lists and unions. Handling such cases is complex and is probably
+ * better handled via a system that allows the user to specify their
+ * intent by providing a schema to apply to the two files.
+ * <p>
+ * Note that schema smoothing itself is an experimental work-around
+ * to a fundamental limitation in Drill:
+ * <ul>
+ * <li>Drill cannot predict the future: each file (or batch)
+ * may have a different schema.</ul>
+ * <li>Drill does not know about these differences until they
+ * occur.</li>
+ * <li>The scan operator is obligated to return the same schema
+ * (and same vectors) from one file to the next, else a "hard"
+ * schema change is sent down stream.</li>
+ * </ul>
+ *
+ * The problem is actually intractable. The schema smoother handles the
+ * cases that can be handled, such as required --> nullable, a column
+ * disappearing, etc. This whole mechanism should be scrapped if/when
+ * Drill can work with schemas. Or, keep this to handle, as best we can,
+ * odd schemas, but insist on a schema to resolve issues that this
+ * mechanism cannot handle (and that, indeed, no algorithm could handle
+ * because such an algorithm would require time-travel: looking into
+ * the future to know what data will be scanned.)
+ */
+
+public class TestSchemaSmoothing extends SubOperatorTest {
+
+ /**
+ * Low-level test of the smoothing projection, including the exceptions
+ * it throws when things are not going its way.
+ */
+
+ @Test
+ public void testSmoothingProjection() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ // Table 1: (a: nullable bigint, b)
+
+ final TupleMetadata schema1 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ ResolvedRow priorSchema;
+ {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new WildcardSchemaProjection(
+ scanProj, schema1, rootTuple,
+ ScanTestUtils.resolvers());
+ priorSchema = rootTuple;
+ }
+
+ // Table 2: (a: nullable bigint, c), column omitted, original schema preserved
+
+ final TupleMetadata schema2 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema2, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ priorSchema = rootTuple;
+ } catch (final IncompatibleSchemaException e) {
+ fail();
+ }
+
+ // Table 3: (a, c, d), column added, must replan schema
+
+ final TupleMetadata schema3 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .add("d", MinorType.INT)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema3, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final IncompatibleSchemaException e) {
+ // Expected
+ }
+
+ // Table 4: (a: double), change type must replan schema
+
+ final TupleMetadata schema4 = new SchemaBuilder()
+ .addNullable("a", MinorType.FLOAT8)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema4, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final IncompatibleSchemaException e) {
+ // Expected
+ }
+
+ // Table 5: Drop a non-nullable column, must replan
+
+ final TupleMetadata schema6 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema6, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final IncompatibleSchemaException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Case in which the table schema is a superset of the prior
+ * schema. Discard prior schema. Turn off auto expansion of
+ * metadata for a simpler test.
+ */
+
+ @Test
+ public void testSmaller() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ smoother.resolve(priorSchema, rootTuple);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ }
+ {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ smoother.resolve(tableSchema, rootTuple);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ /**
+ * Case in which the table schema and prior are disjoint
+ * sets. Discard the prior schema.
+ */
+
+ @Test
+ public void testDisjoint() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ smoother.resolve(schema, rootTuple);
+ return rootTuple;
+ }
+
+ /**
+ * Column names match, but types differ. Discard the prior schema.
+ */
+
+ @Test
+ public void testDifferentTypes() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ /**
+ * The prior and table schemas are identical. Preserve the prior
+ * schema (though, the output is no different than if we discarded
+ * the prior schema...)
+ */
+
+ @Test
+ public void testSameSchemas() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple);
+ assertTrue(actualSchema.isEquivalent(tableSchema));
+ assertTrue(actualSchema.isEquivalent(priorSchema));
+ }
+ }
+
+ /**
+ * The prior and table schemas are identical, but the cases of names differ.
+ * Preserve the case of the first schema.
+ */
+
+ @Test
+ public void testDifferentCase() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.INT)
+ .add("B", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals("a", columns.get(0).name());
+ }
+ }
+
+ /**
+ * Can't preserve the prior schema if it had required columns
+ * where the new schema has no columns.
+ */
+
+ @Test
+ public void testRequired() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ /**
+ * Preserve the prior schema if table is a subset and missing columns
+ * are nullable or repeated.
+ */
+
+ @Test
+ public void testMissingNullableColumns() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .addNullable("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addArray("c", MinorType.BIGINT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ }
+ }
+
+ /**
+ * Preserve the prior schema if table is a subset. Map the table
+ * columns to the output using the prior schema ordering.
+ */
+
+ @Test
+ public void testReordering() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .addNullable("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addArray("c", MinorType.BIGINT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addNullable("a", MinorType.INT)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ }
+ }
+ /**
+ * Integrated test across multiple schemas at the batch level.
+ */
+
+ @Test
+ public void testSmoothableSchemaBatches() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ // Table 1: (a: bigint, b)
+
+ final TupleMetadata schema1 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema1);
+
+ // Just use the original schema.
+
+ assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(1, smoother.schemaVersion());
+ }
+
+ // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved
+
+ final TupleMetadata schema2 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema2);
+ assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(1, smoother.schemaVersion());
+ }
+
+ // Table 3: (a, c, d), column added, must replan schema
+
+ final TupleMetadata schema3 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .add("d", MinorType.INT)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema3);
+ assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(2, smoother.schemaVersion());
+ }
+
+ // Table 4: Drop a non-nullable column, must replan
+
+ final TupleMetadata schema4 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema4);
+ assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(3, smoother.schemaVersion());
+ }
+
+ // Table 5: (a: double), change type must replan schema
+
+ final TupleMetadata schema5 = new SchemaBuilder()
+ .addNullable("a", MinorType.FLOAT8)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema5);
+ assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(4, smoother.schemaVersion());
+ }
+ }
+
+ /**
+ * A SELECT * query uses the schema of the table as the output schema.
+ * This is trivial when the scanner has one table. But, if two or more
+ * tables occur, then things get interesting. The first table sets the
+ * schema. The second table then has:
+ * <ul>
+ * <li>The same schema, trivial case.</li>
+ * <li>A subset of the first table. The type of the "missing" column
+ * from the first table is used for a null column in the second table.</li>
+ * <li>A superset or disjoint set of the first schema. This triggers a hard schema
+ * change.</li>
+ * </ul>
+ * <p>
+ * It is an open question whether previous columns should be preserved on
+ * a hard reset. For now, the code implements, and this test verifies, that a
+ * hard reset clears the "memory" of prior schemas.
+ */
+
+ @Test
+ public void testWildcardSmoothing() {
+ final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator());
+ projector.enableSchemaSmoothing(true);
+ projector.build(RowSetTestUtils.projectAll());
+
+ final TupleMetadata firstSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .addNullable("c", MinorType.BIGINT)
+ .buildSchema();
+ final TupleMetadata subsetSchema = new SchemaBuilder()
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final TupleMetadata disjointSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .add("d", MinorType.VARCHAR)
+ .buildSchema();
+
+ final SchemaTracker tracker = new SchemaTracker();
+ int schemaVersion;
+ {
+ // First table, establishes the baseline
+ // ... FROM table 1
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow(10, "fred", 110L)
+ .addRow(20, "wilma", 110L);
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ schemaVersion = tracker.schemaVersion();
+
+ final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+ .addRow(10, "fred", 110L)
+ .addRow(20, "wilma", 110L)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+ {
+ // Second table, same schema, the trivial case
+ // ... FROM table 2
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow(70, "pebbles", 770L)
+ .addRow(80, "hoppy", 880L);
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ assertEquals(schemaVersion, tracker.schemaVersion());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+ .addRow(70, "pebbles", 770L)
+ .addRow(80, "hoppy", 880L)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+ {
+ // Third table: subset schema of first two
+ // ... FROM table 3
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(subsetSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow("bambam", 30)
+ .addRow("betty", 40);
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ assertEquals(schemaVersion, tracker.schemaVersion());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+ .addRow(30, "bambam", null)
+ .addRow(40, "betty", null)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+ {
+ // Fourth table: disjoint schema, cases a schema reset
+ // ... FROM table 4
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(disjointSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow(50, "dino", "supporting")
+ .addRow(60, "barney", "main");
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ assertNotEquals(schemaVersion, tracker.schemaVersion());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(disjointSchema)
+ .addRow(50, "dino", "supporting")
+ .addRow(60, "barney", "main")
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+
+ projector.close();
+ }
+
+ // TODO: Test schema smoothing with repeated
+ // TODO: Test hard schema change
+ // TODO: Typed null column tests (resurrect)
+ // TODO: Test maps and arrays of maps
+}