diff options
Diffstat (limited to 'exec')
33 files changed, 5741 insertions, 12 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java new file mode 100644 index 000000000..0540d0be1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ColumnProjection.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +/** + * Core interface for a projected column. Models a column throughout the + * projection lifecycle. Columns evolve from unresolved to resolved at + * different times. Each class that derives from this interface can act + * as a column "node", while declaring its type so it may be processed + * easily at the proper type. + * <p> + * For example, an implicit column is processed at the file schema + * resolution phase, converting from unresolved to resolved. At the same + * time, table columns remain unresolved, waiting for the table schema + * to appear. + * <p> + * In an advanced, experimental feature, schema persistence sees some + * columns transition from resolved to unresolved and back again. + * <p> + * Having all column nodes derive from this same interface keeps things + * tidy. + */ + +public interface ColumnProjection { + + public static final int FRAMEWORK_BASE_ID = 100; + public static final int READER_BASE_ID = 200; + + /** + * The name of the column as it appears in the output + * row (record batch.) + * + * @return the output column name + */ + String name(); + + /** + * A node type unique to each node. Nodes defined in this package + * use IDs less than {@link #FRAMEWORK_BASE_ID}. Nodes defined by + * frameworks (for file metadata columns or for other special + * columns) start numbering with {@link #FRAMEWORK_BASE_ID}. Readers + * may need their own specialized nodes, which must use IDs starting + * with {@link #READER_BASE_ID}. + * <p> + * This system solves two problems: + * <ol> + * <li>Provides an efficient way for each mechanism to recognize its + * own nodes without using <code>instance of</code>.</li> + * <li>Allows for frameworks and readers to be added without changing + * any base enum. This works because every instance of this mechanism + * sees only the base nodes, those from a single framework and those + * from a single reader; there is no need for a universal ID registry. + * Two frameworks can use identical IDs because they never mix. + * Similarly, two readers can use the same IDs because Drill does not + * allow a single scan operator to use two different kinds of readers. + * </li> + * </ol> + * @return the numeric ID for this node, used for each layer to + * recognize its own nodes + */ + int nodeType(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java new file mode 100644 index 000000000..6d2e5d121 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ConstantColumnLoader.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.List; + +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.accessor.TupleWriter; + +/** + * Populate metadata columns either file metadata (AKA "implicit + * columns") or directory metadata (AKA "partition columns.") In both + * cases the column type is nullable Varchar and the column value + * is predefined by the projection planner; this class just copies + * that value into each row. + * <p> + * The values for the columns appear in the column definitions. + * This works because the metadata columns are re-resolved for + * for each file, picking up file-specific values. In some cases, + * a column might not even have a value (such as a reference to + * a dirN level that isn't defined for a particular file. + * <p> + * That said, this class is agnostic about the source and meaning + * of the columns: it only cares that the columns are of type + * nullable Varchar and that the values are in the column nodes. + */ + +public class ConstantColumnLoader extends StaticColumnLoader { + + public interface ConstantColumnSpec { + String name(); + MaterializedField schema(); + String value(); + } + + private final String values[]; + private final List<? extends ConstantColumnSpec> constantCols; + + public ConstantColumnLoader(ResultVectorCache vectorCache, + List<? extends ConstantColumnSpec> defns) { + super(vectorCache); + + // Populate the loader schema from that provided. + // Cache values for faster access. + // TODO: Rewrite to specify schema up front + + constantCols = defns; + RowSetLoader schema = loader.writer(); + values = new String[defns.size()]; + for (int i = 0; i < defns.size(); i++) { + ConstantColumnSpec defn = defns.get(i); + values[i] = defn.value(); + schema.addColumn(defn.schema()); + } + } + + @Override + public VectorContainer load(int rowCount) { + loader.startBatch(); + RowSetLoader writer = loader.writer(); + for (int i = 0; i < rowCount; i++) { + writer.start(); + loadRow(writer); + writer.save(); + } + return loader.harvest(); + } + + /** + * Populate static vectors with the defined static values. + * + * @param rowCount number of rows to generate. Must match the + * row count in the batch returned by the reader + */ + + private void loadRow(TupleWriter writer) { + for (int i = 0; i < values.length; i++) { + + // Set the column (of any type) to null if the string value + // is null. + + if (values[i] == null) { + writer.scalar(i).setNull(); + } else { + // Else, set the static (string) value. + + writer.scalar(i).setString(values[i]); + } + } + } + + public List<? extends ConstantColumnSpec> columns() { return constantCols; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java new file mode 100644 index 000000000..41cc59582 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ExplicitSchemaProjection.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.List; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Perform a schema projection for the case of an explicit list of + * projected columns. Example: SELECT a, b, c. + * <p> + * An explicit projection starts with the requested set of columns, + * then looks in the table schema to find matches. That is, it is + * driven by the query itself. + * <p> + * An explicit projection may include columns that do not exist in + * the source schema. In this case, we fill in null columns for + * unmatched projections. + */ + +public class ExplicitSchemaProjection extends SchemaLevelProjection { + + public ExplicitSchemaProjection(ScanLevelProjection scanProj, + TupleMetadata tableSchema, + ResolvedTuple rootTuple, + List<SchemaProjectionResolver> resolvers) { + super(resolvers); + resolveRootTuple(scanProj, rootTuple, tableSchema); + } + + private void resolveRootTuple(ScanLevelProjection scanProj, + ResolvedTuple rootTuple, + TupleMetadata tableSchema) { + for (ColumnProjection col : scanProj.columns()) { + if (col.nodeType() == UnresolvedColumn.UNRESOLVED) { + resolveColumn(rootTuple, ((UnresolvedColumn) col).element(), tableSchema); + } else { + resolveSpecial(rootTuple, col, tableSchema); + } + } + } + + private void resolveColumn(ResolvedTuple outputTuple, + RequestedColumn inputCol, TupleMetadata tableSchema) { + int tableColIndex = tableSchema.index(inputCol.name()); + if (tableColIndex == -1) { + resolveNullColumn(outputTuple, inputCol); + } else { + resolveTableColumn(outputTuple, inputCol, + tableSchema.metadata(tableColIndex), + tableColIndex); + } + } + + private void resolveTableColumn(ResolvedTuple outputTuple, + RequestedColumn requestedCol, + ColumnMetadata column, int sourceIndex) { + + // Is the requested column implied to be a map? + // A requested column is a map if the user requests x.y and we + // are resolving column x. The presence of y as a member implies + // that x is a map. + + if (requestedCol.isTuple()) { + resolveMap(outputTuple, requestedCol, column, sourceIndex); + } + + // Is the requested column implied to be an array? + // This occurs when the projection list contains at least one + // array index reference such as x[10]. + + else if (requestedCol.isArray()) { + resolveArray(outputTuple, requestedCol, column, sourceIndex); + } + + // A plain old column. Might be an array or a map, but if + // so, the request list just mentions it by name without implying + // the column type. That is, the project list just contains x + // by itself. + + else { + projectTableColumn(outputTuple, requestedCol, column, sourceIndex); + } + } + + private void resolveMap(ResolvedTuple outputTuple, + RequestedColumn requestedCol, ColumnMetadata column, + int sourceIndex) { + + // If the actual column isn't a map, then the request is invalid. + + if (! column.isMap()) { + throw UserException + .validationError() + .message("Project list implies a map column, but actual column is not a map") + .addContext("Projected column", requestedCol.fullName()) + .addContext("Actual type", column.type().name()) + .build(logger); + } + + // The requested column is implied to be a map because it lists + // members to project. Project these. + + ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple, + column.schema(), sourceIndex); + resolveTuple(mapCol.members(), requestedCol.mapProjection(), + column.mapSchema()); + + // If the projection is simple, then just project the map column + // as is. A projection is simple if all map columns from the table + // are projected, and no null columns are needed. The simple case + // occurs more often than one might expect because the result set + // loader only projected those columns that were needed, so the only + // issue we have to handle is null columns. + // + // In the simple case, we discard the map tuple just created + // since we ended up not needing it. + + if (mapCol.members().isSimpleProjection()) { + outputTuple.removeChild(mapCol.members()); + projectTableColumn(outputTuple, requestedCol, column, sourceIndex); + } + + // The resolved tuple may have a subset of table columns + // and/or null columns. Project a new map that will be created + // to hold the projected map elements. + + else { + outputTuple.add(mapCol); + } + } + + private void resolveTuple(ResolvedTuple mapTuple, + RequestedTuple requestedTuple, TupleMetadata mapSchema) { + for (RequestedColumn col : requestedTuple.projections()) { + resolveColumn(mapTuple, col, mapSchema); + } + } + + private void resolveArray(ResolvedTuple outputTuple, + RequestedColumn requestedCol, ColumnMetadata column, + int sourceIndex) { + + // If the actual column isn't a array or list, + // then the request is invalid. + + if (column.type() != MinorType.LIST && ! column.isArray()) { + throw UserException + .validationError() + .message("Project list implies an array, but actual column is not an array") + .addContext("Projected column", requestedCol.fullName()) + .addContext("Actual cardinality", column.mode().name()) + .build(logger); + } + + // The project operator will do the actual array element + // projection. + + projectTableColumn(outputTuple, requestedCol, column, sourceIndex); + } + + /** + * Project a column to the specified output tuple. The name comes from the + * project list. (If the actual column name is `X` (upper case), but the + * project list requests `x` (lower case), project the column using the + * lower-case name. The column type comes from the table column. The source + * index is the location in the table map or row. + * + * @param outputTuple + * projected tuple being built + * @param requestedCol + * column as requested in the project list + * @param column + * metadata for the actual table column + * @param sourceIndex + * index of the column within the table tuple (implies the location + * of the table vector to be projected) + */ + + private void projectTableColumn(ResolvedTuple outputTuple, + RequestedColumn requestedCol, + ColumnMetadata column, int sourceIndex) { + outputTuple.add( + new ResolvedTableColumn(requestedCol.name(), + MaterializedField.create(requestedCol.name(), + column.majorType()), + outputTuple, sourceIndex)); + } + + /** + * Resolve a null column. This is a projected column which does not match + * an implicit or table column. We consider two cases: a simple top-level + * column reference ("a", say) and an implied map reference ("a.b", say.) + * If the column appears to be a map, determine the set of children, which + * map appear to any depth, that were requested. + */ + + private void resolveNullColumn(ResolvedTuple outputTuple, + RequestedColumn requestedCol) { + ResolvedColumn nullCol; + if (requestedCol.isTuple()) { + nullCol = resolveMapMembers(outputTuple, requestedCol); + } else { + nullCol = outputTuple.nullBuilder.add(requestedCol.name()); + } + outputTuple.add(nullCol); + } + + /** + * A child column of a map is not projected. Recurse to determine the full + * set of nullable child columns. + * + * @param projectedColumn the map column which was projected + * @return a list of null markers for the requested children + */ + + private ResolvedColumn resolveMapMembers(ResolvedTuple outputTuple, RequestedColumn col) { + ResolvedMapColumn mapCol = new ResolvedMapColumn(outputTuple, col.name()); + ResolvedTuple members = mapCol.members(); + for (RequestedColumn child : col.mapProjection().projections()) { + if (child.isTuple()) { + members.add(resolveMapMembers(members, child)); + } else { + members.add(outputTuple.nullBuilder.add(child.name())); + } + } + return mapCol; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java new file mode 100644 index 000000000..fe1e0240a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/MetadataManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; +import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; + +/** + * Queries can contain a wildcard (*), table columns, or special + * system-defined columns (the file metadata columns AKA implicit + * columns, the `columns` column of CSV, etc.). + * <p> + * This class provides a generalized way of handling such extended + * columns. That is, this handles metadata for columns defined by + * the scan or file; columns defined by the table (the actual + * data metadata) is handled elsewhere. + * <p> + * Objects of this interface are driven by the projection processing + * framework which provides a vector cache from which to obtain + * materialized columns. The implementation must provide a projection + * parser to pick out the columns which this object handles. + * <p> + * A better name might be ImplicitMetadataManager to signify that + * this is about metadata other than table columns. + */ +public interface MetadataManager { + + void bind(ResultVectorCache vectorCache); + + ScanProjectionParser projectionParser(); + + SchemaProjectionResolver resolver(); + + /** + * Define (materialize) the columns which this manager + * represents. + */ + void define(); + + /** + * Load data into the custom columns, if needed (such as for + * null or implicit columns.) + * + * @param rowCount number of rows read into a batch. + */ + void load(int rowCount); + + /** + * Event indicating the end of a file (or other data source.) + */ + void endFile(); + + /** + * Event indicating the end of a scan. + */ + void close(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java new file mode 100644 index 000000000..ec95de8a7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NoOpMetadataManager.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; +import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; + +/** + * Do-nothing implementation of the metadata manager. Allows the + * metadata manager to be optional without needing an if-statement + * on every access. + */ + +public class NoOpMetadataManager implements MetadataManager { + + @Override + public void bind(ResultVectorCache vectorCache) { } + + @Override + public ScanProjectionParser projectionParser() { return null; } + + @Override + public SchemaProjectionResolver resolver() { + // The resolver is requested only for user-defined metadata + // managers, not for this default, no-op version. If this + // method is called, something is amiss with the default + // setup. + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public void define() { } + + @Override + public void load(int rowCount) { } + + @Override + public void endFile() { } + + @Override + public void close() { } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java new file mode 100644 index 000000000..7e9d2fdd4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnBuilder.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader.NullColumnSpec; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; + +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; + +/** + * Manages null columns by creating a null column loader for each + * set of non-empty null columns. This class acts as a scan-wide + * facade around the per-schema null column loader. + */ + +public class NullColumnBuilder implements VectorSource { + + /** + * Creates null columns if needed. + */ + + protected final List<NullColumnSpec> nullCols = new ArrayList<>(); + private NullColumnLoader nullColumnLoader; + private VectorContainer outputContainer; + + /** + * The reader-specified null type if other than the default. + */ + + private final MajorType nullType; + private final boolean allowRequiredNullColumns; + + public NullColumnBuilder( + MajorType nullType, boolean allowRequiredNullColumns) { + this.nullType = nullType; + this.allowRequiredNullColumns = allowRequiredNullColumns; + } + + public NullColumnBuilder newChild() { + return new NullColumnBuilder(nullType, allowRequiredNullColumns); + } + + public ResolvedNullColumn add(String name) { + return add(name, null); + } + + public ResolvedNullColumn add(String name, MajorType type) { + final ResolvedNullColumn col = new ResolvedNullColumn(name, type, this, nullCols.size()); + nullCols.add(col); + return col; + } + + public void build(ResultVectorCache vectorCache) { + close(); + + // If no null columns for this schema, no need to create + // the loader. + + if (hasColumns()) { + nullColumnLoader = new NullColumnLoader(vectorCache, nullCols, nullType, allowRequiredNullColumns); + outputContainer = nullColumnLoader.output(); + } + } + + public boolean hasColumns() { + return nullCols != null && ! nullCols.isEmpty(); + } + + public void load(int rowCount) { + if (nullColumnLoader != null) { + final VectorContainer output = nullColumnLoader.load(rowCount); + assert output == outputContainer; + } + } + + @Override + public ValueVector vector(int index) { + return outputContainer.getValueVector(index).getValueVector(); + } + + @VisibleForTesting + public VectorContainer output() { return outputContainer; } + + public void close() { + if (nullColumnLoader != null) { + nullColumnLoader.close(); + nullColumnLoader = null; + } + if (outputContainer != null) { + outputContainer.clear(); + outputContainer = null; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java new file mode 100644 index 000000000..0c36cb99d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/NullColumnLoader.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.RowSetLoader; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; + +/** + * Create and populate null columns for the case in which a SELECT statement + * refers to columns that do not exist in the actual table. Nullable and array + * types are suitable for null columns. (Drill defines an empty array as the + * same as a null array: not true, but the best we have at present.) Required + * types cannot be used as we don't know what value to set into the column + * values. + * <p> + * Seeks to preserve "vector continuity" by reusing vectors when possible. + * Cases: + * <ul> + * <li>A column a was available in a prior reader (or batch), but is no longer + * available, and is thus null. Reuses the type and vector of the prior reader + * (or batch) to prevent trivial schema changes.</li> + * <li>A column has an implied type (specified in the metadata about the + * column provided by the reader.) That type information is used instead of + * the defined null column type.</li> + * <li>A column has no type information. The type becomes the null column type + * defined by the reader (or nullable int by default.</li> + * <li>Required columns are not suitable. If any of the above found a required + * type, convert the type to nullable.</li> + * <li>The resulting column and type, whatever it turned out to be, is placed + * into the vector cache so that it can be reused by the next reader or batch, + * to again preserve vector continuity.</li> + * </ul> + * The above rules eliminate "trivia" schema changes, but can still result in + * "hard" schema changes if a required type is replaced by a nullable type. + */ + +public class NullColumnLoader extends StaticColumnLoader { + + public interface NullColumnSpec { + String name(); + MajorType type(); + void setType(MajorType type); + } + + public static final MajorType DEFAULT_NULL_TYPE = MajorType.newBuilder() + .setMinorType(MinorType.INT) + .setMode(DataMode.OPTIONAL) + .build(); + + private final MajorType nullType; + private final boolean allowRequired; + private final boolean isArray[]; + + public NullColumnLoader(ResultVectorCache vectorCache, List<? extends NullColumnSpec> defns, + MajorType nullType, boolean allowRequired) { + super(vectorCache); + + // Normally, null columns must be optional or arrays. However + // we allow required columns either if the client requests it, + // or if the client's requested null type is itself required. + // (The generated "null column" vectors will go into the vector + // cache and be pulled back out; we must preserve the required + // mode in this case. + + this.allowRequired = allowRequired || + (nullType != null && nullType.getMode() == DataMode.REQUIRED); + + // Use the provided null type, else the standard nullable int. + + if (nullType == null) { + this.nullType = DEFAULT_NULL_TYPE; + } else { + this.nullType = nullType; + } + + // Populate the loader schema from that provided + + RowSetLoader schema = loader.writer(); + isArray = new boolean[defns.size()]; + for (int i = 0; i < defns.size(); i++) { + NullColumnSpec defn = defns.get(i); + MaterializedField colSchema = selectType(defn); + isArray[i] = colSchema.getDataMode() == DataMode.REPEATED; + schema.addColumn(colSchema); + } + } + + /** + * Implements the type mapping algorithm; preferring the best fit + * to preserve the schema, else resorting to changes when needed. + * @param defn output column definition + * @return type of the empty column that implements the definition + */ + + private MaterializedField selectType(NullColumnSpec defn) { + + // Prefer the type of any previous occurrence of + // this column. + + MajorType type = vectorCache.getType(defn.name()); + + // Else, use the type defined in the projection, if any. + + if (type == null) { + type = defn.type(); + } + if (type != null && ! allowRequired && type.getMode() == DataMode.REQUIRED) { + + // Type was found in the vector cache and the type is required. + // The client determines whether to map required types to optional. + // The text readers use required Varchar columns for missing columns, + // and so no required-to-optional mapping is desired. Other readers + // want to use nulls for missing columns, and so need the + // required-to-nullable mapping. + + type = MajorType.newBuilder() + .setMinorType(type.getMinorType()) + .setMode(DataMode.OPTIONAL) + .build(); + } + + // Else, use the specified null type. + + if (type == null) { + type = nullType; + } + + // If the schema had the special NULL type, replace it with the + // null column type. + + if (type.getMinorType() == MinorType.NULL) { + type = nullType; + } + defn.setType(type); + return MaterializedField.create(defn.name(), type); + } + + public VectorContainer output() { + return loader.outputContainer(); + } + + @Override + public VectorContainer load(int rowCount) { + loader.startBatch(); + loader.skipRows(rowCount); + return loader.harvest(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java new file mode 100644 index 000000000..a658629f2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedColumn.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.record.MaterializedField; + +/** + * A resolved column has a name, and a specification for how to project + * data from a source vector to a vector in the final output container. + * Describes the projection of a single column from + * an input to an output batch. + * <p> + * Although the table schema mechanism uses the newer "metadata" + * mechanism, resolved columns revert back to the original + * {@link MajorType} and {@link MaterializedField} mechanism used + * by the rest of Drill. Doing so loses a bit of additional + * information, but at present there is no way to export that information + * along with a serialized record batch; each operator must rediscover + * it after deserialization. + */ + +public abstract class ResolvedColumn implements ColumnProjection { + + public final VectorSource source; + public final int sourceIndex; + + public ResolvedColumn(VectorSource source, int sourceIndex) { + this.source = source; + this.sourceIndex = sourceIndex; + } + + public VectorSource source() { return source; } + + public int sourceIndex() { return sourceIndex; } + + /** + * Return the type of this column. Used primarily by the schema smoothing + * mechanism. + * + * @return the MaterializedField representation of this column + */ + + public abstract MaterializedField schema(); + + public void project(ResolvedTuple dest) { + dest.addVector(source.vector(sourceIndex)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java new file mode 100644 index 000000000..4a158f7b0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedMapColumn.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.common.types.Types; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedMap; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedMapArray; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedSingleMap; +import org.apache.drill.exec.record.MaterializedField; + +/** + * Represents a column which is implicitly a map (because it has children + * in the project list), but which does not match any column in the table. + * This kind of column gives rise to a map of null columns in the output. + */ + +public class ResolvedMapColumn extends ResolvedColumn { + + public static final int ID = 17; + + private final MaterializedField schema; + private final ResolvedTuple parent; + private final ResolvedMap members; + + public ResolvedMapColumn(ResolvedTuple parent, String name) { + super(parent, -1); + schema = MaterializedField.create(name, + Types.required(MinorType.MAP)); + this.parent = parent; + members = new ResolvedSingleMap(this); + parent.addChild(members); + } + + public ResolvedMapColumn(ResolvedTuple parent, + MaterializedField schema, int sourceIndex) { + super(parent, sourceIndex); + this.schema = schema; + this.parent = parent; + + // This column corresponds to an input map. + // We may have to create a matching new output + // map. Determine whether it is a single map or + // an array. + + assert schema.getType().getMinorType() == MinorType.MAP; + if (schema.getType().getMode() == DataMode.REPEATED) { + members = new ResolvedMapArray(this); + } else { + members = new ResolvedSingleMap(this); + } + parent.addChild(members); + } + + public ResolvedTuple parent() { return parent; } + + @Override + public String name() { return schema.getName(); } + + @Override + public int nodeType() { return ID; } + + public ResolvedTuple members() { return members; } + + @Override + public void project(ResolvedTuple dest) { + dest.addVector(members.buildMap()); + } + + @Override + public MaterializedField schema() { return schema; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java new file mode 100644 index 000000000..7694b7e71 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedNullColumn.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader.NullColumnSpec; +import org.apache.drill.exec.record.MaterializedField; + +/** + * Projected column that serves as both a resolved column (provides projection + * mapping) and a null column spec (provides the information needed to create + * the required null vectors.) + */ + +public class ResolvedNullColumn extends ResolvedColumn implements NullColumnSpec { + + public static final int ID = 4; + + private final String name; + private MajorType type; + + public ResolvedNullColumn(String name, MajorType type, VectorSource source, int sourceIndex) { + super(source, sourceIndex); + this.name = name; + this.type = type; + } + + @Override + public String name() { return name; } + + @Override + public MajorType type() { return type; } + + @Override + public int nodeType() { return ID; } + + @Override + public void setType(MajorType type) { + + // Update the actual type based on what the null-column + // mechanism chose for this column. + + this.type = type; + } + + @Override + public MaterializedField schema() { + return MaterializedField.create(name, type); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java new file mode 100644 index 000000000..f17802a6c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTableColumn.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.record.MaterializedField; + +/** + * Column that matches one provided by the table. Provides the data type + * of that column and information to project from the result set loader + * output container to the scan output container. (Note that the result + * set loader container is, itself, a projection from the actual table + * schema to the desired set of columns; but in the order specified + * by the table.) + */ + +public class ResolvedTableColumn extends ResolvedColumn { + + public static final int ID = 3; + + public final String projectedName; + public final MaterializedField schema; + + public ResolvedTableColumn(String projectedName, + MaterializedField schema, + VectorSource source, int sourceIndex) { + super(source, sourceIndex); + this.projectedName = projectedName; + this.schema = schema; + } + + @Override + public String name() { return projectedName; } + + @Override + public MaterializedField schema() { return schema; } + + @Override + public int nodeType() { return ID; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf + .append("[") + .append(getClass().getSimpleName()) + .append(" name=") + .append(name()) + .append("]"); + return buf.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java new file mode 100644 index 000000000..c6e15106b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector; + +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; + +/** + * Drill rows are made up of a tree of tuples, with the row being the root + * tuple. Each tuple contains columns, some of which may be maps. This + * class represents each row or map in the output projection. + * <p> + * Output columns within the tuple can be projected from the data source, + * might be null (requested columns that don't match a data source column) + * or might be a constant (such as an implicit column.) This class + * orchestrates assembling an output tuple from a collection of these + * three column types. (Though implicit columns appear only in the root + * tuple.) + * + * <h4>Null Handling</h4> + * + * The project list might reference a "missing" map if the project list + * includes, say, <tt>SELECT a.b.c</tt> but <tt>`a`</tt> does not exist + * in the data source. In this case, the column a is implied to be a map, + * so the projection mechanism will create a null map for <tt>`a`</tt> + * and <tt>`b`</tt>, and will create a null column for <tt>`c`</tt>. + * <p> + * To accomplish this recursive null processing, each tuple is associated + * with a null builder. (The null builder can be null if projection is + * implicit with a wildcard; in such a case no null columns can occur. + * But, even here, with schema persistence, a <tt>SELECT *</tt> query + * may need null columns if a second file does not contain a column + * that appeared in a first file.) + * <p> + * The null builder is bound to each tuple to allow vector persistence + * via the result vector cache. If we must create a null column + * <tt>`x`</tt> in two different readers, then the rules of Drill + * require that the same vector be used for both (or else a schema + * change is signaled.) The vector cache works by name (and type). + * Since maps may contain columns with the same names as other maps, + * the vector cache must be associated with each tuple. And, by extension, + * the null builder must also be associated with each tuple. + * + * <h4>Lifecycle</h4> + * + * The lifecycle of a resolved tuple is: + * <ul> + * <li>The projection mechanism creates the output tuple, and its columns, + * by comparing the project list against the table schema. The result is + * a set of table, null, or constant columns.</li> + * <li>Once per schema change, the resolved tuple creates the output + * tuple by linking to vectors in their original locations. As it turns out, + * we can simply share the vectors; we don't need to transfer the buffers.</li> + * <li>To prepare for the transfer, the tuple asks the null column builder + * (if present) to build the required null columns.</li> + * <li>Once the output tuple is built, it can be used for any number of + * batches without further work. (The same vectors appear in the various inputs + * and the output, eliminating the need for any transfers.)</li> + * <li>Once per batch, the client must set the row count. This is needed for the + * output container, and for any "null" maps that the project may have created.</li> + * </ul> + * + * <h4>Projection Mapping</h4> + * + * Each column is is mapped into the output tuple (vector container or map) in + * the order that the columns are defined here. (That order follows the project + * list for explicit projection, or the table schema for implicit projection.) + * The source, however, may be in any order (at least for the table schema.) + * A projection mechanism identifies the {@link VectorSource} that supplies the + * vector for the column, along with the vector's index within that source. + * The resolved tuple is bound to an output tuple. The projection mechanism + * grabs the input vector from the vector source at the indicated position, and + * links it into the output tuple, represented by this projected tuple, at the + * position of the resolved column in the child list. + * + * <h4>Caveats</h4> + * + * The project mechanism handles nested "missing" columns as mentioned + * above. This works to create null columns within maps that are defined by the + * data source. However, the mechanism does not currently handle creating null + * columns within repeated maps or lists. Doing so is possible, but requires + * adding a level of cardinality computation to create the proper number of + * "inner" values. + */ + +public abstract class ResolvedTuple implements VectorSource { + + /** + * Represents the top-level tuple which is projected to a + * vector container. + */ + + public static class ResolvedRow extends ResolvedTuple { + + private VectorContainer input; + private VectorContainer output; + + public ResolvedRow(NullColumnBuilder nullBuilder) { + super(nullBuilder); + } + + public void project(VectorContainer input, VectorContainer output) { + this.input = input; + this.output = output; + output.removeAll(); + buildColumns(); + output.buildSchema(SelectionVectorMode.NONE); + } + + @Override + public ValueVector vector(int index) { + return input.getValueVector(index).getValueVector(); + } + + @Override + public void addVector(ValueVector vector) { + output.add(vector); + } + + @Override + public void setRowCount(int rowCount) { + output.setRecordCount(rowCount); + cascadeRowCount(rowCount); + } + + @Override + public BufferAllocator allocator() { + return output.getAllocator(); + } + + @Override + public String name() { + + // Never used in production, just for debugging. + + return "$root$"; + } + + public VectorContainer output() { return output; } + + @Override + public int innerCardinality(int rowCount) { return rowCount; } + } + + /** + * Represents a map implied by the project list, whether or not the map + * actually appears in the table schema. + * The column is implied to be a map because it contains + * children. This implementation builds the map and its children. + */ + + public static abstract class ResolvedMap extends ResolvedTuple { + + protected final ResolvedMapColumn parentColumn; + protected AbstractMapVector inputMap; + protected AbstractMapVector outputMap; + + public ResolvedMap(ResolvedMapColumn parentColumn) { + super(parentColumn.parent().nullBuilder == null ? null : parentColumn.parent().nullBuilder.newChild()); + this.parentColumn = parentColumn; + } + + @Override + public void addVector(ValueVector vector) { + outputMap.putChild(vector.getField().getName(), vector); + } + + @Override + public ValueVector vector(int index) { + assert inputMap != null; + return inputMap.getChildByOrdinal(index); + } + + public AbstractMapVector buildMap() { + if (parentColumn.sourceIndex() != -1) { + final ResolvedTuple parentTuple = parentColumn.parent(); + inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex()); + } + final MaterializedField colSchema = parentColumn.schema(); + outputMap = createMap(inputMap, + MaterializedField.create( + colSchema.getName(), colSchema.getType()), + parentColumn.parent().allocator()); + buildColumns(); + return outputMap; + } + + protected abstract AbstractMapVector createMap(AbstractMapVector inputMap, + MaterializedField create, BufferAllocator allocator); + + @Override + public BufferAllocator allocator() { + return outputMap.getAllocator(); + } + + @Override + public String name() { return parentColumn.name(); } + } + + public static class ResolvedSingleMap extends ResolvedMap { + + public ResolvedSingleMap(ResolvedMapColumn parentColumn) { + super(parentColumn); + } + + @Override + protected AbstractMapVector createMap(AbstractMapVector inputMap, + MaterializedField schema, BufferAllocator allocator) { + return new MapVector(schema, + allocator, null); + } + + @Override + public void setRowCount(int rowCount) { + ((MapVector) outputMap).setMapValueCount(rowCount); + cascadeRowCount(rowCount); + } + + @Override + public int innerCardinality(int outerCardinality) { + return outerCardinality; + } + } + + /** + * Represents a map tuple (not the map column, rather the value of the + * map column.) When projecting, we create a new repeated map vector, + * but share the offsets vector from input to output. The size of the + * offset vector reveals the number of elements in the "inner" array, + * which is the number of null values to create if null columns are + * added. + */ + + public static class ResolvedMapArray extends ResolvedMap { + + private int valueCount; + + public ResolvedMapArray(ResolvedMapColumn parentColumn) { + super(parentColumn); + } + + @Override + protected AbstractMapVector createMap(AbstractMapVector inputMap, + MaterializedField schema, BufferAllocator allocator) { + + // Create a new map array, reusing the offset vector from + // the original input map. + + final RepeatedMapVector source = (RepeatedMapVector) inputMap; + final UInt4Vector offsets = source.getOffsetVector(); + valueCount = offsets.getAccessor().getValueCount(); + return new RepeatedMapVector(schema, + offsets, null); + } + + @Override + public int innerCardinality(int outerCardinality) { + return valueCount; + } + + @Override + public void setRowCount(int rowCount) { + cascadeRowCount(valueCount); + } + } + + protected final List<ResolvedColumn> members = new ArrayList<>(); + protected final NullColumnBuilder nullBuilder; + protected List<ResolvedTuple> children; + protected VectorSource binding; + + public ResolvedTuple(NullColumnBuilder nullBuilder) { + this.nullBuilder = nullBuilder; + } + + public NullColumnBuilder nullBuilder() { + return nullBuilder; + } + + public void add(ResolvedColumn col) { + members.add(col); + } + + public void addChild(ResolvedTuple child) { + if (children == null) { + children = new ArrayList<>(); + } + children.add(child); + } + + public void removeChild(ResolvedTuple child) { + assert ! children.isEmpty() && children.get(children.size()-1) == child; + children.remove(children.size()-1); + } + + public boolean isSimpleProjection() { + if (children != null && ! children.isEmpty()) { + return false; + } + for (int i = 0; i < members.size(); i++) { + if (members.get(i).nodeType() == ResolvedNullColumn.ID) { + return false; + } + } + return true; + } + + @VisibleForTesting + public List<ResolvedColumn> columns() { return members; } + + public void buildNulls(ResultVectorCache vectorCache) { + if (nullBuilder != null) { + nullBuilder.build(vectorCache); + } + if (children != null) { + for (final ResolvedTuple child : children) { + child.buildNulls(vectorCache.childCache(child.name())); + } + } + } + + public void loadNulls(int rowCount) { + if (nullBuilder != null) { + nullBuilder.load(rowCount); + } + if (children != null) { + for (final ResolvedTuple child : children) { + child.loadNulls(innerCardinality(rowCount)); + } + } + } + + public abstract int innerCardinality(int outerCardinality); + + /** + * Merge two or more <i>partial batches</i> to produce a final output batch. + * A partial batch is a vertical slice of a batch, such as the set of null + * columns or the set of data columns. + * <p> + * For example, consider + * two partial batches:<pre><code> + * (a, d, e) + * (c, b)</code></pre> + * We may wish to merge them by projecting columns into an output batch + * of the form:<pre><code> + * (a, b, c, d)</code></pre> + * It is not necessary to project all columns from the inputs, but all + * columns in the output must have a projection. + * <p> + * The merger is created once per schema, then can be reused for any + * number of batches. The only restriction is that the partial batches must + * have the same row count so that the final output batch record + * count makes sense. + * <p> + * Merging is done by discarding any data in the output, then exchanging + * the buffers from the input columns to the output, leaving projected + * columns empty. Note that unprojected columns must be cleared by the + * caller. The caller will have figured out which columns to project and + * which not to project. + */ + + protected void buildColumns() { + for (int i = 0; i < members.size(); i++) { + members.get(i).project(this); + } + } + + public abstract void addVector(ValueVector vector); + + public abstract void setRowCount(int rowCount); + + protected void cascadeRowCount(int rowCount) { + if (children == null) { + return; + } + for (final ResolvedTuple child : children) { + child.setRowCount(rowCount); + } + } + + public abstract BufferAllocator allocator(); + + public abstract String name(); + + /** + * During planning, discard a partial plan to allow reusing the same (root) tuple + * for multiple projection plans. + */ + + public void reset() { + members.clear(); + children = null; + } + + public void close() { + if (nullBuilder != null) { + nullBuilder.close(); + } + if (children != null) { + for (final ResolvedTuple child : children) { + child.close(); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java new file mode 100644 index 000000000..e20724521 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanLevelProjection.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; +import org.apache.drill.exec.physical.rowSet.project.RequestedTupleImpl; + +/** + * Parses and analyzes the projection list passed to the scanner. The + * projection list is per scan, independent of any tables that the + * scanner might scan. The projection list is then used as input to the + * per-table projection planning. + * <p> + * In most query engines, this kind of projection analysis is done at + * plan time. But, since Drill is schema-on-read, we don't know the + * available columns, or their types, until we start scanning a table. + * The table may provide the schema up-front, or may discover it as + * the read proceeds. Hence, the job here is to make sense of the + * project list based on static a-priori information, then to create + * a list that can be further resolved against an table schema when it + * appears. This give us two steps: + * <ul> + * <li>Scan-level projection: this class, that handles schema for the + * entire scan operator.</li> + * <li>Table-level projection: defined elsewhere, that merges the + * table and scan-level projections. + * </ul> + * <p> + * Accepts the inputs needed to + * plan a projection, builds the mappings, and constructs the projection + * mapping object. + * <p> + * Builds the per-scan projection plan given a set of projected columns. + * Determines the output schema, which columns to project from the data + * source, which are metadata, and so on. + * <p> + * An annoying aspect of SQL is that the projection list (the list of + * columns to appear in the output) is specified after the SELECT keyword. + * In Relational theory, projection is about columns, selection is about + * rows... + * <p> + * Mappings can be based on three primary use cases: + * <ul> + * <li><tt>SELECT *</tt>: Project all data source columns, whatever they happen + * to be. Create columns using names from the data source. The data source + * also determines the order of columns within the row.</li> + * <li><tt>SELECT columns</tt>: Similar to SELECT * in that it projects all columns + * from the data source, in data source order. But, rather than creating + * individual output columns for each data source column, creates a single + * column which is an array of Varchars which holds the (text form) of + * each column as an array element.</li> + * <li><tt>SELECT a, b, c, ...</tt>: Project a specific set of columns, identified by + * case-insensitive name. The output row uses the names from the SELECT list, + * but types from the data source. Columns appear in the row in the order + * specified by the SELECT.</li> + * <li<tt>SELECT ...</tt>: SELECT nothing, occurs in <tt>SELECT COUNT(*)</tt> + * type queries. The provided projection list contains no (table) columns, though + * it may contain metadata columns.</li> + * </ul> + * Names in the SELECT list can reference any of five distinct types of output + * columns: + * <ul> + * <li>Wildcard ("*") column: indicates the place in the projection list to insert + * the table columns once found in the table projection plan.</li> + * <li>Data source columns: columns from the underlying table. The table + * projection planner will determine if the column exists, or must be filled + * in with a null column.</li> + * <li>The generic data source columns array: <tt>columns</tt>, or optionally + * specific members of the <tt>columns</tt> array such as <tt>columns[1]</tt>.</li> + * <li>Implicit columns: <tt>fqn</tt>, <tt>filename</tt>, <tt>filepath</tt> + * and <tt>suffix</tt>. These reference + * parts of the name of the file being scanned.</li> + * <li>Partition columns: <tt>dir0</tt>, <tt>dir1</tt>, ...: These reference + * parts of the path name of the file.</li> + * </ul> + * + * @see {@link ImplicitColumnExplorer}, the class from which this class + * evolved + */ + +public class ScanLevelProjection { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanLevelProjection.class); + + /** + * Interface for add-on parsers, avoids the need to create + * a single, tightly-coupled parser for all types of columns. + * The main parser handles wildcards and assumes the rest of + * the columns are table columns. The add-on parser can tag + * columns as special, such as to hold metadata. + */ + + public interface ScanProjectionParser { + void bind(ScanLevelProjection builder); + boolean parse(RequestedColumn inCol); + void validate(); + void validateColumn(ColumnProjection col); + void build(); + } + + // Input + + protected final List<SchemaPath> projectionList; + + // Configuration + + protected List<ScanProjectionParser> parsers; + private final boolean v1_12MetadataLocation; + + // Internal state + + protected boolean sawWildcard; + + // Output + + protected List<ColumnProjection> outputCols = new ArrayList<>(); + protected RequestedTuple outputProjection; + protected boolean hasWildcard; + protected boolean emptyProjection = true; + + /** + * Specify the set of columns in the SELECT list. Since the column list + * comes from the query planner, assumes that the planner has checked + * the list for syntax and uniqueness. + * + * @param queryCols list of columns in the SELECT list in SELECT list order + * @return this builder + */ + public ScanLevelProjection(List<SchemaPath> projectionList, + List<ScanProjectionParser> parsers, + boolean v1_12MetadataLocation) { + this.projectionList = projectionList; + this.parsers = parsers; + this.v1_12MetadataLocation = v1_12MetadataLocation; + doParse(); + } + + private void doParse() { + outputProjection = RequestedTupleImpl.parse(projectionList); + + for (ScanProjectionParser parser : parsers) { + parser.bind(this); + } + for (RequestedColumn inCol : outputProjection.projections()) { + if (inCol.isWildcard()) { + mapWildcard(inCol); + } else { + mapColumn(inCol); + } + } + verify(); + for (ScanProjectionParser parser : parsers) { + parser.build(); + } + } + + public ScanLevelProjection(List<SchemaPath> projectionList, + List<ScanProjectionParser> parsers) { + this(projectionList, parsers, false); + } + + /** + * Wildcard is special: add it, then let parsers add any custom + * columns that are needed. The order is important: we want custom + * columns to follow table columns. + */ + + private void mapWildcard(RequestedColumn inCol) { + + // Wildcard column: this is a SELECT * query. + + if (sawWildcard) { + throw new IllegalArgumentException("Duplicate * entry in project list"); + } + + // Remember the wildcard position, if we need to insert it. + // Ensures that the main wildcard expansion occurs before add-on + // columns. + + int wildcardPosn = outputCols.size(); + + // Parsers can consume the wildcard. But, all parsers must + // have visibility to the wildcard column. + + for (ScanProjectionParser parser : parsers) { + if (parser.parse(inCol)) { + wildcardPosn = -1; + } + } + + // Set this flag only after the parser checks. + + sawWildcard = true; + + // If not consumed, put the wildcard column into the projection list as a + // placeholder to be filled in later with actual table columns. + + if (wildcardPosn != -1) { + + // Drill 1.1 - 1.11 and Drill 1.13 or later put metadata columns after + // data columns. Drill 1.12 moved them before data columns. For testing + // and compatibility, the client can request to use the Drill 1.12 position, + // though the after-data position is the default. + // + // Note that the after-data location is much more convenient for the dirx + // partition columns since these vary in number across scans within the same query. + // By putting them at the end, the index of all other columns remains + // constant. Drill 1.12 broke that behavior, but Drill 1.13 restored it. + // + // This option can be removed in Drill 1.14 after things settle down. + + UnresolvedColumn wildcardCol = new UnresolvedColumn(inCol, UnresolvedColumn.WILDCARD); + if (v1_12MetadataLocation) { + outputCols.add(wildcardCol); + } else { + outputCols.add(wildcardPosn, wildcardCol); + } + hasWildcard = true; + emptyProjection = false; + } + } + + /** + * Map the column into one of five categories. + * <ol> + * <li>Star column (to designate SELECT *)</li> + * <li>Partition file column (dir0, dir1, etc.)</li> + * <li>Implicit column (fqn, filepath, filename, suffix)</li> + * <li>Special <tt>columns</tt> column which holds all columns as + * an array.</li> + * <li>Table column. The actual match against the table schema + * is done later.</li> + * </ol> + * + * Actual mapping is done by parser extensions for all but the + * basic cases. + * + * @param inCol the SELECT column + */ + + private void mapColumn(RequestedColumn inCol) { + + // Give the extensions first crack at each column. + // Some may want to "sniff" a column, even if they + // don't fully handle it. + + for (ScanProjectionParser parser : parsers) { + if (parser.parse(inCol)) { + return; + } + } + + // This is a desired table column. + + addTableColumn( + new UnresolvedColumn(inCol, UnresolvedColumn.UNRESOLVED)); + } + + public void addTableColumn(ColumnProjection outCol) { + outputCols.add(outCol); + emptyProjection = false; + } + + public void addMetadataColumn(ColumnProjection outCol) { + outputCols.add(outCol); + } + + /** + * Once all columns are identified, perform a final pass + * over the set of columns to do overall validation. Each + * add-on parser is given an opportunity to do its own + * validation. + */ + + private void verify() { + + // Let parsers do overall validation. + + for (ScanProjectionParser parser : parsers) { + parser.validate(); + } + + // Validate column-by-column. + + for (ColumnProjection outCol : outputCols) { + for (ScanProjectionParser parser : parsers) { + parser.validateColumn(outCol); + } + switch (outCol.nodeType()) { + case UnresolvedColumn.UNRESOLVED: + if (hasWildcard()) { + throw new IllegalArgumentException("Cannot select table columns and * together"); + } + break; + default: + break; + } + } + } + + /** + * Return the set of columns from the SELECT list + * @return the SELECT list columns, in SELECT list order + */ + + public List<SchemaPath> requestedCols() { return projectionList; } + + /** + * The entire set of output columns, in output order. Output order is + * that specified in the SELECT (for an explicit list of columns) or + * table order (for SELECT * queries). + * @return the set of output columns in output order + */ + + public List<ColumnProjection> columns() { return outputCols; } + + public boolean hasWildcard() { return hasWildcard; } + + /** + * Return whether this is a SELECT * query + * @return true if this is a SELECT * query + */ + + public boolean projectAll() { return hasWildcard; } + + /** + * Returns true if the projection list is empty. This usually + * indicates a <tt>SELECT COUNT(*)</tt> query (though the scan + * operator does not have the context to know that an empty + * list does, in fact, imply a count-only query...) + * + * @return true if no table columns are projected, false + * if at least one column is projected (or the query contained + * the wildcard) + */ + + public boolean projectNone() { return emptyProjection; } + + public RequestedTuple rootProjection() { return outputProjection; } + + @Override + public String toString() { + return new StringBuilder() + .append("[") + .append(getClass().getSimpleName()) + .append(" projection=") + .append(outputCols.toString()) + .append("]") + .toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java new file mode 100644 index 000000000..c2b0262fd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; +import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl; +import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Performs projection of a record reader, along with a set of static + * columns, to produce the final "public" result set (record batch) + * for the scan operator. Primarily solve the "vector permanence" + * problem: that the scan operator must present the same set of vectors + * to downstream operators despite the fact that the scan operator hosts + * a series of readers, each of which builds its own result set. + * <p> + * Provides the option to continue a schema from one batch to the next. + * This can reduce spurious schema changes in formats, such as JSON, with + * varying fields. It is not, however, a complete solution as the outcome + * still depends on the order of file scans and the division of files across + * readers. + * <p> + * Provides the option to infer the schema from the first batch. The "quick path" + * to obtain the schema will read one batch, then use that schema as the returned + * schema, returning the full batch in the next call to <tt>next()</tt>. + * + * <h4>Publishing the Final Result Set<h4> + * + * This class "publishes" a vector container that has the final, projected + * form of a scan. The projected schema include: + * <ul> + * <li>Columns from the reader.</li> + * <li>Static columns, such as implicit or partition columns.</li> + * <li>Null columns for items in the select list, but not found in either + * of the above two categories.</li> + * </ul> + * The order of columns is that set by the select list (or, by the reader for + * a <tt>SELECT *</tt> query. + * + * <h4>Schema Handling</h4> + * + * The mapping handles a variety of cases: + * <ul> + * <li>An early-schema table (one in which we know the schema and + * the schema remains constant for the whole table.</li> + * <li>A late schema table (one in which we discover the schema as + * we read the table, and where the schema can change as the read + * progresses.)<ul> + * <li>Late schema table with SELECT * (we want all columns, whatever + * they happen to be.)</li> + * <li>Late schema with explicit select list (we want only certain + * columns when they happen to appear in the input.)</li></ul></li> + * </ul> + * + * <h4>Implementation Overview</h4> + * + * Major tasks of this class include: + * <ul> + * <li>Project table columns (change position and or name).</li> + * <li>Insert static and null columns.</li> + * <li>Schema smoothing. That is, if table A produces columns (a, b), but + * table B produces only (a), use the type of the first table's b column for the + * null created for the missing b in table B.</li> + * <li>Vector persistence: use the same set of vectors across readers as long + * as the reader schema does not cause a "hard" schema change (change in type, + * introduction of a new column.</li> + * <li>Detection of schema changes (change of type, introduction of a new column + * for a <tt>SELECT *</tt> query, changing the projected schema, and reporting + * the change downstream.</li> + * </ul> + * A projection is needed to: + * <ul> + * <li>Reorder table columns</li> + * <li>Select a subset of table columns</li> + * <li>Fill in missing select columns</li> + * <li>Fill in implicit or partition columns</li> + * </ul> + * Creates and returns the batch merger that does the projection. + * + * <h4>Projection</h4> + * + * To visualize this, assume we have numbered table columns, lettered + * implicit, null or partition columns:<pre><code> + * [ 1 | 2 | 3 | 4 ] Table columns in table order + * [ A | B | C ] Static columns + * </code></pre> + * Now, we wish to project them into select order. + * Let's say that the SELECT clause looked like this, with "t" + * indicating table columns:<pre><code> + * SELECT t2, t3, C, B, t1, A, t2 ... + * </code></pre> + * Then the projection looks like this:<pre><code> + * [ 2 | 3 | C | B | 1 | A | 2 ] + * </code></pre> + * Often, not all table columns are projected. In this case, the + * result set loader presents the full table schema to the reader, + * but actually writes only the projected columns. Suppose we + * have:<pre><code> + * SELECT t3, C, B, t1,, A ... + * </code></pre> + * Then the abbreviated table schema looks like this:<pre><code> + * [ 1 | 3 ]</code></pre> + * Note that table columns retain their table ordering. + * The projection looks like this:<pre><code> + * [ 2 | C | B | 1 | A ] + * </code></pre> + * <p> + * The projector is created once per schema, then can be reused for any + * number of batches. + * <p> + * Merging is done in one of two ways, depending on the input source: + * <ul> + * <li>For the table loader, the merger discards any data in the output, + * then exchanges the buffers from the input columns to the output, + * leaving projected columns empty. Note that unprojected columns must + * be cleared by the caller.</li> + * <li>For implicit and null columns, the output vector is identical + * to the input vector.</li> + */ + +public class ScanSchemaOrchestrator { + + public static final int MIN_BATCH_BYTE_SIZE = 256 * 1024; + public static final int MAX_BATCH_BYTE_SIZE = Integer.MAX_VALUE; + public static final int DEFAULT_BATCH_ROW_COUNT = 4096; + public static final int DEFAULT_BATCH_BYTE_COUNT = ValueVector.MAX_BUFFER_SIZE; + public static final int MAX_BATCH_ROW_COUNT = ValueVector.MAX_ROW_COUNT; + + /** + * Orchestrates projection tasks for a single reader with the set that the + * scan operator manages. Vectors are reused across readers, but via a vector + * cache. All other state is distinct between readers. + */ + + public class ReaderSchemaOrchestrator implements VectorSource { + + private int readerBatchSize; + private ResultSetLoaderImpl tableLoader; + private int prevTableSchemaVersion = -1; + + /** + * Assembles the table, metadata and null columns into the final output + * batch to be sent downstream. The key goal of this class is to "smooth" + * schema changes in this output batch by absorbing trivial schema changes + * that occur across readers. + */ + + private ResolvedRow rootTuple; + private VectorContainer tableContainer; + + public ReaderSchemaOrchestrator() { + readerBatchSize = scanBatchRecordLimit; + } + + public void setBatchSize(int size) { + if (size > 0) { + readerBatchSize = Math.min(size, scanBatchRecordLimit); + } + } + + public ResultSetLoader makeTableLoader(TupleMetadata tableSchema) { + OptionBuilder options = new OptionBuilder(); + options.setRowCountLimit(readerBatchSize); + options.setVectorCache(vectorCache); + options.setBatchSizeLimit(scanBatchByteLimit); + + // Set up a selection list if available and is a subset of + // table columns. (Only needed for non-wildcard queries.) + // The projection list includes all candidate table columns + // whether or not they exist in the up-front schema. Handles + // the odd case where the reader claims a fixed schema, but + // adds a column later. + + if (! scanProj.projectAll()) { + options.setProjectionSet(scanProj.rootProjection()); + } + options.setSchema(tableSchema); + + // Create the table loader + + tableLoader = new ResultSetLoaderImpl(allocator, options.build()); + + // If a schema is given, create a zero-row batch to announce the + // schema downstream in the form of an empty batch. + + if (tableSchema != null) { + tableLoader.startEmptyBatch(); + endBatch(); + } + + return tableLoader; + } + + public boolean hasSchema() { + return prevTableSchemaVersion >= 0; + } + + public void startBatch() { + tableLoader.startBatch(); + } + + /** + * Build the final output batch by projecting columns from the three input sources + * to the output batch. First, build the metadata and/or null columns for the + * table row count. Then, merge the sources. + */ + + public void endBatch() { + + // Get the batch results in a container. + + tableContainer = tableLoader.harvest(); + + // If the schema changed, set up the final projection based on + // the new (or first) schema. + + if (prevTableSchemaVersion < tableLoader.schemaVersion()) { + reviseOutputProjection(); + } else { + + // Fill in the null and metadata columns. + + populateNonDataColumns(); + } + rootTuple.setRowCount(tableContainer.getRecordCount()); + } + + private void populateNonDataColumns() { + int rowCount = tableContainer.getRecordCount(); + metadataManager.load(rowCount); + rootTuple.loadNulls(rowCount); + } + + /** + * Create the list of null columns by comparing the SELECT list against the + * columns available in the batch schema. Create null columns for those that + * are missing. This is done for the first batch, and any time the schema + * changes. (For early-schema, the projection occurs once as the schema is set + * up-front and does not change.) For a SELECT *, the null column check + * only need be done if null columns were created when mapping from a prior + * schema. + */ + + private void reviseOutputProjection() { + + // Do the table-schema level projection; the final matching + // of projected columns to available columns. + + TupleMetadata tableSchema = tableLoader.harvestSchema(); + if (schemaSmoother != null) { + doSmoothedProjection(tableSchema); + } else if (scanProj.hasWildcard()) { + doWildcardProjection(tableSchema); + } else { + doExplicitProjection(tableSchema); + } + + // Combine metadata, nulls and batch data to form the final + // output container. Columns are created by the metadata and null + // loaders only in response to a batch, so create the first batch. + + rootTuple.buildNulls(vectorCache); + metadataManager.define(); + populateNonDataColumns(); + rootTuple.project(tableContainer, outputContainer); + prevTableSchemaVersion = tableLoader.schemaVersion(); + } + + private void doSmoothedProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow( + new NullColumnBuilder(nullType, allowRequiredNullColumns)); + schemaSmoother.resolve(tableSchema, rootTuple); + } + + /** + * Query contains a wildcard. The schema-level projection includes + * all columns provided by the reader. + */ + + private void doWildcardProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow(null); + new WildcardSchemaProjection(scanProj, + tableSchema, rootTuple, schemaResolvers); + } + + /** + * Explicit projection: include only those columns actually + * requested by the query, which may mean filling in null + * columns for projected columns that don't actually exist + * in the table. + * + * @param tableSchema newly arrived schema + */ + + private void doExplicitProjection(TupleMetadata tableSchema) { + rootTuple = new ResolvedRow( + new NullColumnBuilder(nullType, allowRequiredNullColumns)); + new ExplicitSchemaProjection(scanProj, + tableSchema, rootTuple, + schemaResolvers); + } + + @Override + public ValueVector vector(int index) { + return tableContainer.getValueVector(index).getValueVector(); + } + + public void close() { + RuntimeException ex = null; + try { + if (tableLoader != null) { + tableLoader.close(); + tableLoader = null; + } + } + catch (RuntimeException e) { + ex = e; + } + try { + if (rootTuple != null) { + rootTuple.close(); + rootTuple = null; + } + } + catch (RuntimeException e) { + ex = ex == null ? e : ex; + } + metadataManager.endFile(); + if (ex != null) { + throw ex; + } + } + } + + // Configuration + + /** + * Custom null type, if provided by the operator. If + * not set, the null type is the Drill default. + */ + + private MajorType nullType; + + /** + * Creates the metadata (file and directory) columns, if needed. + */ + + private MetadataManager metadataManager; + private final BufferAllocator allocator; + private int scanBatchRecordLimit = DEFAULT_BATCH_ROW_COUNT; + private int scanBatchByteLimit = DEFAULT_BATCH_BYTE_COUNT; + private boolean v1_12MetadataLocation; + private final List<ScanProjectionParser> parsers = new ArrayList<>(); + + /** + * List of resolvers used to resolve projection columns for each + * new schema. Allows operators to introduce custom functionality + * as a plug-in rather than by copying code or subclassing this + * mechanism. + */ + + List<SchemaProjectionResolver> schemaResolvers = new ArrayList<>(); + + private boolean useSchemaSmoothing; + private boolean allowRequiredNullColumns; + + // Internal state + + /** + * Cache used to preserve the same vectors from one output batch to the + * next to keep the Project operator happy (which depends on exactly the + * same vectors. + * <p> + * If the Project operator ever changes so that it depends on looking up + * vectors rather than vector instances, this cache can be deprecated. + */ + + private ResultVectorCacheImpl vectorCache; + private ScanLevelProjection scanProj; + private ReaderSchemaOrchestrator currentReader; + private SchemaSmoother schemaSmoother; + + // Output + + private VectorContainer outputContainer; + + public ScanSchemaOrchestrator(BufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Specify an optional metadata manager. Metadata is a set of constant + * columns with per-reader values. For file-based sources, this is usually + * the implicit and partition columns; but it could be other items for other + * data sources. + * + * @param metadataMgr the application-specific metadata manager to use + * for this scan + */ + + public void withMetadata(MetadataManager metadataMgr) { + metadataManager = metadataMgr; + schemaResolvers.add(metadataManager.resolver()); + } + + /** + * Specify a custom batch record count. This is the maximum number of records + * per batch for this scan. Readers can adjust this, but the adjustment is capped + * at the value specified here + * + * @param scanBatchSize maximum records per batch + */ + + public void setBatchRecordLimit(int batchRecordLimit) { + scanBatchRecordLimit = Math.max(1, + Math.min(batchRecordLimit, ValueVector.MAX_ROW_COUNT)); + } + + public void setBatchByteLimit(int byteLimit) { + scanBatchByteLimit = Math.max(MIN_BATCH_BYTE_SIZE, + Math.min(byteLimit, MAX_BATCH_BYTE_SIZE)); + } + + /** + * Specify the type to use for null columns in place of the standard + * nullable int. This type is used for all missing columns. (Readers + * that need per-column control need a different mechanism.) + * + * @param nullType + */ + + public void setNullType(MajorType nullType) { + this.nullType = nullType; + } + + /** + * Enable schema smoothing: introduces an addition level of schema + * resolution each time a schema changes from a reader. + * + * @param flag true to enable schema smoothing, false to disable + */ + + public void enableSchemaSmoothing(boolean flag) { + useSchemaSmoothing = flag; + } + + public void allowRequiredNullColumns(boolean flag) { + allowRequiredNullColumns = flag; + } + + public void useDrill1_12MetadataPosition(boolean flag) { + v1_12MetadataLocation = flag; + } + + public void build(List<SchemaPath> projection) { + vectorCache = new ResultVectorCacheImpl(allocator, useSchemaSmoothing); + + // If no metadata manager was provided, create a mock + // version just to keep code simple. + + if (metadataManager == null) { + metadataManager = new NoOpMetadataManager(); + } + metadataManager.bind(vectorCache); + + // Bind metadata manager parser to scan projector. + // A "real" (non-mock) metadata manager will provide + // a projection parser. Use this to tell us that this + // setup supports metadata. + + ScanProjectionParser parser = metadataManager.projectionParser(); + if (parser != null) { + + // For compatibility with Drill 1.12, insert the file metadata + // parser before others so that, in a wildcard query, metadata + // columns appear before others (such as the `columns` column.) + // This is temporary and should be removed once the test framework + // is restored to Drill 1.11 functionality. + + if (v1_12MetadataLocation) { + parsers.add(0, parser); + } else { + parsers.add(parser); + } + } + + // Parse the projection list. + + scanProj = new ScanLevelProjection(projection, parsers, v1_12MetadataLocation); + + if (scanProj.hasWildcard() && useSchemaSmoothing) { + schemaSmoother = new SchemaSmoother(scanProj, schemaResolvers); + } + + // Build the output container. + + outputContainer = new VectorContainer(allocator); + } + + public void addParser(ScanProjectionParser parser) { + parsers.add(parser); + } + + public void addResolver(SchemaProjectionResolver resolver) { + schemaResolvers.add(resolver); + } + + public ReaderSchemaOrchestrator startReader() { + closeReader(); + currentReader = new ReaderSchemaOrchestrator(); + return currentReader; + } + + public boolean isProjectNone() { + return scanProj.projectNone(); + } + + public boolean hasSchema() { + return currentReader != null && currentReader.hasSchema(); + } + + public VectorContainer output() { + return outputContainer; + } + + public void closeReader() { + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } + } + + public void close() { + closeReader(); + if (outputContainer != null) { + outputContainer.clear(); + outputContainer = null; + } + vectorCache.close(); + metadataManager.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java new file mode 100644 index 000000000..c7bae278e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaLevelProjection.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Computes the full output schema given a table (or batch) + * schema. Takes the original, unresolved output list from the projection + * definition, merges it with the file, directory and table schema information, + * and produces a partially or fully resolved output list. + * <p> + * A "resolved" projection list is a list of concrete columns: table + * columns, nulls, file metadata or partition metadata. An unresolved list + * has either table column names, but no match, or a wildcard column. + * <p> + * The idea is that the projection list moves through stages of resolution + * depending on which information is available. An "early schema" table + * provides schema information up front, and so allows fully resolving + * the projection list on table open. A "late schema" table allows only a + * partially resolved projection list, with the remainder of resolution + * happening on the first (or perhaps every) batch. + * <p> + * Data source (table) schema can be of two forms: + * <ul> + * <li>Early schema: the schema is known before reading data. A JDBC data + * source is an example, as is a CSV reader for a file with headers.</li> + * <li>Late schema: the schema is not known until data is read, and is + * discovered on the fly. Example: JSON, which declares values as maps + * without an up-front schema.</li> + * </ul> + * These two forms give rise to distinct ways of planning the projection. + * <p> + * The final result of the projection is a set of "output" columns: a set + * of columns that, taken together, defines the row (bundle of vectors) that + * the scan operator produces. Columns are ordered: the order specified here + * must match the order that columns appear in the result set loader and the + * vector container so that code can access columns by index as well as name. + * + * @see {@link ImplicitColumnExplorer}, the class from which this class + * evolved + */ + +public class SchemaLevelProjection { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaLevelProjection.class); + + /** + * Schema-level projection is customizable. Implement this interface, and + * add an instance to the scan orchestrator, to perform custom mappings + * from unresolved columns (perhaps of an extension-specified type) to + * final projected columns. The metadata manager, for example, implements + * this interface to map metadata columns. + */ + + public interface SchemaProjectionResolver { + void startResolution(); + boolean resolveColumn(ColumnProjection col, ResolvedTuple tuple, + TupleMetadata tableSchema); + } + + protected final List<SchemaProjectionResolver> resolvers; + + protected SchemaLevelProjection( + List<SchemaProjectionResolver> resolvers) { + if (resolvers == null) { + resolvers = new ArrayList<>(); + } + this.resolvers = resolvers; + for (SchemaProjectionResolver resolver : resolvers) { + resolver.startResolution(); + } + } + + protected void resolveSpecial(ResolvedTuple rootOutputTuple, ColumnProjection col, + TupleMetadata tableSchema) { + for (SchemaProjectionResolver resolver : resolvers) { + if (resolver.resolveColumn(col, rootOutputTuple, tableSchema)) { + return; + } + } + throw new IllegalStateException("No resolver for column: " + col.nodeType()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java new file mode 100644 index 000000000..b15fe9a2a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SchemaSmoother.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.List; + +import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Implements a "schema smoothing" algorithm. + * Schema persistence for the wildcard selection (i.e. SELECT *) + * <p> + * Constraints: + * <ul> + * <li>Adding columns causes a hard schema change.</li> + * <li>Removing columns is allowed, uses type from previous + * schema, as long as previous mode was nullable or repeated.</li> + * <li>Changing type or mode causes a hard schema change.</li> + * <li>Changing column order is fine; use order from previous + * schema.</li> + * </ul> + * This can all be boiled down to a simpler rule: + * <ul> + * <li>Schema persistence is possible if the output schema + * from a prior schema can be reused for the current schema.</li> + * <li>Else, a hard schema change occurs and a new output + * schema is derived from the new table schema.</li> + * </ul> + * The core idea here is to "unresolve" a fully-resolved table schema + * to produce a new projection list that is the equivalent of using that + * prior projection list in the SELECT. Then, keep that projection list only + * if it is compatible with the next table schema, else throw it away and + * start over from the actual scan projection list. + * <p> + * Algorithm: + * <ul> + * <li>If partitions are included in the wildcard, and the new + * file needs more than the current one, create a new schema.</li> + * <li>Else, treat partitions as select, fill in missing with + * nulls.</li> + * <li>From an output schema, construct a new select list + * specification as though the columns in the current schema were + * explicitly specified in the SELECT clause.</li> + * <li>For each new schema column, verify that the column exists + * in the generated SELECT clause and is of the same type. + * If not, create a new schema.</li> + * <li>Use the generated schema to plan a new projection from + * the new schema to the prior schema.</li> + * </ul> + */ + +public class SchemaSmoother { + + /** + * Exception thrown if the prior schema is not compatible with the + * new table schema. + */ + + @SuppressWarnings("serial") + public static class IncompatibleSchemaException extends Exception { } + + private final ScanLevelProjection scanProj; + private final List<SchemaProjectionResolver> resolvers; + private ResolvedTuple priorSchema; + private int schemaVersion = 0; + + public SchemaSmoother(ScanLevelProjection scanProj, + List<SchemaProjectionResolver> resolvers) { + this.scanProj = scanProj; + this.resolvers = resolvers; + } + + public SchemaLevelProjection resolve( + TupleMetadata tableSchema, + ResolvedTuple outputTuple) { + + // If a prior schema exists, try resolving the new table using the + // prior schema. If this works, use the projection. Else, start + // over with the scan projection. + + if (priorSchema != null) { + try { + SmoothingProjection smoother = new SmoothingProjection(scanProj, tableSchema, + priorSchema, outputTuple, resolvers); + priorSchema = outputTuple; + return smoother; + } catch (IncompatibleSchemaException e) { + outputTuple.reset(); + // Fall through + } + } + + // Can't use the prior schema. Start over with the original scan projection. + // Type smoothing is provided by the vector cache; but a hard schema change + // will occur because either a type has changed or a new column has appeared. + // (Or, this is the first schema.) + + SchemaLevelProjection schemaProj = new WildcardSchemaProjection(scanProj, + tableSchema, outputTuple, resolvers); + priorSchema = outputTuple; + schemaVersion++; + return schemaProj; + } + + public int schemaVersion() { return schemaVersion; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java new file mode 100644 index 000000000..da199b8c5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/SmoothingProjection.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Resolve a table schema against the prior schema. This works only if the + * types match and if all columns in the table schema already appear in the + * prior schema. + * <p> + * Consider this an experimental mechanism. The hope was that, with clever + * techniques, we could "smooth over" some of the issues that cause schema + * change events in Drill. As it turned out, however, creating this mechanism + * revealed that it is not possible, even in theory, to handle most schema + * changes because of the time dimension: + * <ul> + * <li>An even in a later batch may provide information that would have + * caused us to make a different decision in an earlier batch. For example, + * we are asked for column `foo`, did not see such a column in the first + * batch, block or file, guessed some type, and later saw that the column + * was of a different type. We can't "time travel" to tell our earlier + * selves, nor, when we make the initial type decision, can we jump to + * the future to see what type we'll discover.</li> + * <li>Readers in this fragment may see column `foo` but readers in + * another fragment read files/blocks that don't have that column. The + * two readers cannot communicate to agree on a type.</li> + * </ul> + * <p> + * What this mechanism can do is make decisions based on history: when a + * column appears, we can adjust its type a bit to try to avoid an + * unnecessary change. For example, if a prior file in this scan saw + * `foo` as nullable Varchar, but the present file has the column as + * requied Varchar, we can use the more general nullable form. But, + * again, the "can't predict the future" bites us: we can handle a + * nullable-to-required column change, but not visa-versa. + * <p> + * What this mechanism will tell the careful reader is that the only + * general solution to the schema-change problem is to now the full + * schema up front: for the planner to be told the schema and to + * communicate that schema to all readers so that all readers agree + * on the final schema. + * <p> + * When that is done, the techniques shown here can be used to adjust + * any per-file variation of schema to match the up-front schema. + */ + +public class SmoothingProjection extends SchemaLevelProjection { + + protected final List<MaterializedField> rewrittenFields = new ArrayList<>(); + + public SmoothingProjection(ScanLevelProjection scanProj, + TupleMetadata tableSchema, + ResolvedTuple priorSchema, + ResolvedTuple outputTuple, + List<SchemaProjectionResolver> resolvers) throws IncompatibleSchemaException { + + super(resolvers); + + for (ResolvedColumn priorCol : priorSchema.columns()) { + switch (priorCol.nodeType()) { + case ResolvedTableColumn.ID: + case ResolvedNullColumn.ID: + // This is a regular column known to this base framework. + resolveColumn(outputTuple, priorCol, tableSchema); + break; + default: + // The column is one known to an add-on mechanism. + resolveSpecial(outputTuple, priorCol, tableSchema); + } + } + + // Check if all table columns were matched. Since names are unique, + // each column can be matched at most once. If the number of matches is + // less than the total number of columns, then some columns were not + // matched and we must start over. + + if (rewrittenFields.size() < tableSchema.size()) { + throw new IncompatibleSchemaException(); + } + } + + /** + * Resolve a prior column against the current table schema. Resolves to + * a table column, a null column, or throws an exception if the + * schemas are incompatible + * + * @param priorCol a column from the prior schema + * @throws IncompatibleSchemaException if the prior column exists in + * the current table schema, but with an incompatible type + */ + + private void resolveColumn(ResolvedTuple outputTuple, + ResolvedColumn priorCol, TupleMetadata tableSchema) throws IncompatibleSchemaException { + int tableColIndex = tableSchema.index(priorCol.name()); + if (tableColIndex == -1) { + resolveNullColumn(outputTuple, priorCol); + return; + } + MaterializedField tableCol = tableSchema.column(tableColIndex); + MaterializedField priorField = priorCol.schema(); + if (! tableCol.isPromotableTo(priorField, false)) { + throw new IncompatibleSchemaException(); + } + outputTuple.add( + new ResolvedTableColumn(priorCol.name(), priorField, outputTuple, tableColIndex)); + rewrittenFields.add(priorField); + } + + /** + * A prior schema column does not exist in the present table column schema. + * Create a null column with the same type as the prior column, as long as + * the prior column was not required. + * + * @param priorCol the prior column to project to a null column + * @throws IncompatibleSchemaException if the prior column was required + * and thus cannot be null-filled + */ + + private void resolveNullColumn(ResolvedTuple outputTuple, + ResolvedColumn priorCol) throws IncompatibleSchemaException { + if (priorCol.schema().getType().getMode() == DataMode.REQUIRED) { + throw new IncompatibleSchemaException(); + } + outputTuple.add(outputTuple.nullBuilder().add(priorCol.name(), + priorCol.schema().getType())); + } + + public List<MaterializedField> revisedTableSchema() { return rewrittenFields; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java new file mode 100644 index 000000000..bfd3614b5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/StaticColumnLoader.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.OptionBuilder; +import org.apache.drill.exec.physical.rowSet.impl.ResultSetLoaderImpl; +import org.apache.drill.exec.record.VectorContainer; + +/** + * Base class for columns that take values based on the + * reader, not individual rows. E.g. null columns (values + * are all null) or file metadata (AKA "implicit") columns + * that take values based on the file. + */ + +public abstract class StaticColumnLoader { + protected final ResultSetLoader loader; + protected final ResultVectorCache vectorCache; + + public StaticColumnLoader(ResultVectorCache vectorCache) { + + ResultSetLoaderImpl.ResultSetOptions options = new OptionBuilder() + .setVectorCache(vectorCache) + .build(); + loader = new ResultSetLoaderImpl(vectorCache.allocator(), options); + this.vectorCache = vectorCache; + } + + /** + * Populate static vectors with the defined static values. + * + * @param rowCount number of rows to generate. Must match the + * row count in the batch returned by the reader + */ + + public abstract VectorContainer load(int rowCount); + + public void close() { + loader.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java new file mode 100644 index 000000000..2dfa9c44d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/UnresolvedColumn.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; + +/** + * Represents a projected column that has not yet been bound to a + * table column, special column or a null column. Once bound, this + * column projection is replaced with the detailed binding. + */ +public class UnresolvedColumn implements ColumnProjection { + + public static final int WILDCARD = 1; + public static final int UNRESOLVED = 2; + + /** + * The original physical plan column to which this output column + * maps. In some cases, multiple output columns map map the to the + * same "input" (to the projection process) column. + */ + + protected final RequestedColumn inCol; + private final int id; + + public UnresolvedColumn(RequestedColumn inCol, int id) { + this.inCol = inCol; + this.id = id; + } + + @Override + public int nodeType() { return id; } + + @Override + public String name() { return inCol.name(); } + + public RequestedColumn element() { return inCol; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf + .append("[") + .append(getClass().getSimpleName()) + .append(" type=") + .append(id == WILDCARD ? "wildcard" : "column"); + if (inCol != null) { + buf + .append(", incol=") + .append(inCol.toString()); + } + return buf.append("]").toString(); + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java new file mode 100644 index 000000000..625ee73de --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/VectorSource.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.exec.vector.ValueVector; + +/** + * Generic mechanism for retrieving vectors from a source tuple when + * projecting columns to the output tuple. Works around the fact that + * vector containers and maps are both tuples, but have very different + * interfaces. Also allows other classes to act as "proxies" for a + * source tuple. + */ + +public interface VectorSource { + ValueVector vector(int index); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java new file mode 100644 index 000000000..d1cfbe171 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/WildcardSchemaProjection.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.List; + +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; + +/** + * Perform a wildcard projection. In this case, the query wants all + * columns in the source table, so the table drives the final projection. + * Since we include only those columns in the table, there is no need + * to create null columns. Example: SELECT * + */ + +public class WildcardSchemaProjection extends SchemaLevelProjection { + + public WildcardSchemaProjection(ScanLevelProjection scanProj, + TupleMetadata tableSchema, + ResolvedTuple rootTuple, + List<SchemaProjectionResolver> resolvers) { + super(resolvers); + for (ColumnProjection col : scanProj.columns()) { + if (col.nodeType() == UnresolvedColumn.WILDCARD) { + projectAllColumns(rootTuple, tableSchema); + } else { + resolveSpecial(rootTuple, col, tableSchema); + } + } + } + + /** + * Project all columns from table schema to the output, in table + * schema order. Since we accept any map columns as-is, no need + * to do recursive projection. + * + * @param tableSchema + */ + + private void projectAllColumns(ResolvedTuple rootTuple, TupleMetadata tableSchema) { + for (int i = 0; i < tableSchema.size(); i++) { + MaterializedField colSchema = tableSchema.column(i); + rootTuple.add( + new ResolvedTableColumn(colSchema.getName(), + colSchema, rootTuple, i)); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java new file mode 100644 index 000000000..224c69ad1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/package-info.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides run-time semantic analysis of the projection list for the + * scan operator. The project list can include table columns and a + * variety of special columns. Requested columns can exist in the table, + * or may be "missing" with null values applied. The code here prepares + * a run-time projection plan based on the actual table schema. + * <p> + * The core concept is one of successive refinement of the project + * list through a set of rewrites: + * <ul> + * <li>Scan-level rewrite: convert {@link SchemaPath} entries into + * internal column nodes, tagging the nodes with the column type: + * wildcard, unresolved table column, or special columns (such as + * file metadata.) The scan-level rewrite is done once per scan + * operator.</li> + * <li>Reader-level rewrite: convert the internal column nodes into + * other internal nodes, leaving table column nodes unresolved. The + * typical use is to fill in metadata columns with information about a + * specific file.</li> + * <li>Schema-level rewrite: given the actual schema of a record batch, + * rewrite the reader-level projection to describe the final projection + * from incoming data to output container. This step fills in missing + * columns, expands wildcards, etc.</li> + * </ul> + * The following outlines the steps from scan plan to per-file data + * loading to producing the output batch. The center path is the + * projection metadata which turns into an actual output batch. + * <pre> + * Scan Plan + * | + * v + * +--------------+ + * | Project List | + * | Parser | + * +--------------+ + * | + * v + * +------------+ + * | Scan Level | + * | Projection | -----------+ + * +------------+ | + * | | + * v v + * +------+ +------------+ +------------+ +-----------+ + * | File | ---> | File Level | | Result Set | ---> | Data File | + * | Data | | Projection | | Loader | <--- | Reader | + * +------+ +------------+ +------------+ +-----------+ + * | | + * v | + * +--------------+ Table | + * | Schema Level | Schema | + * | Projection | <---------+ + * +--------------+ | + * | | + * v | + * +--------+ Loaded | + * | Output | Vectors | + * | Mapper | <------------+ + * +--------+ + * | + * v + * Output Batch + * </pre> + * <p> + * The output mapper includes mechanisms to populate implicit columns, create + * null columns, and to merge implicit, null and data columns, omitting + * unprojected data columns. + * <p> + * In all cases, projection must handle maps, which are a recursive structure + * much like a row. That is, Drill consists of nested tuples (the row and maps), + * each of which contains columns which can be maps. Thus, there is a set of + * alternating layers of tuples, columns, tuples, and so on until we get to leaf + * (non-map) columns. As a result, most of the above structures are in the form + * of tuple trees, requiring recursive algorithms to apply rules down through the + * nested layers of tuples. + * <p> + * The above mechanism is done at runtime, in each scan fragment. Since Drill is + * schema-on-read, and has no plan-time schema concept, run-time projection is + * required. On the other hand, if Drill were ever to support the "classic" + * plan-time schema resolution, then much of this work could be done at plan + * time rather than (redundantly) at runtime. The main change would be to do + * the work abstractly, working with column and row descriptions, rather than + * concretely with vectors as is done here. Then, that abstract description + * would feed directly into these mechanisms with the "final answer" about + * projection, batch layout, and so on. The parts of this mechanism that + * create and populate vectors would remain. + */ + +package org.apache.drill.exec.physical.impl.scan.project;
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java index 0dde5979b..3488e7bb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java @@ -23,7 +23,7 @@ import java.util.Map; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.rowSet.ResultVectorCache; -import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState; +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState; import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.OffsetVectorState; import org.apache.drill.exec.physical.rowSet.impl.UnionState.UnionVectorState; import org.apache.drill.exec.physical.rowSet.project.RequestedTuple; @@ -111,14 +111,14 @@ public class ListState extends ContainerState private final ColumnMetadata schema; private final ListVector vector; - private final FixedWidthVectorState bitsVectorState; - private final OffsetVectorState offsetVectorState; + private final VectorState bitsVectorState; + private final VectorState offsetVectorState; private VectorState memberVectorState; public ListVectorState(UnionWriterImpl writer, ListVector vector) { this.schema = writer.schema(); this.vector = vector; - bitsVectorState = new FixedWidthVectorState(writer, vector.getBitsVector()); + bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector()); offsetVectorState = new OffsetVectorState(writer, vector.getOffsetVector(), writer.elementPosition()); memberVectorState = new NullVectorState(); } @@ -126,7 +126,7 @@ public class ListState extends ContainerState public ListVectorState(ListWriterImpl writer, WriterPosition elementWriter, ListVector vector) { this.schema = writer.schema(); this.vector = vector; - bitsVectorState = new FixedWidthVectorState(writer, vector.getBitsVector()); + bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector()); offsetVectorState = new OffsetVectorState(writer, vector.getOffsetVector(), elementWriter); memberVectorState = new NullVectorState(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java index f9c2bff2e..b01292074 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.physical.rowSet.impl; -import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.FixedWidthVectorState; +import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.IsSetVectorState; import org.apache.drill.exec.physical.rowSet.impl.SingleVectorState.SimpleVectorState; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.NullableVector; @@ -31,15 +31,15 @@ public class NullableVectorState implements VectorState { private final ColumnMetadata schema; private final NullableScalarWriter writer; private final NullableVector vector; - private final SimpleVectorState bitsState; - private final SimpleVectorState valuesState; + private final VectorState bitsState; + private final VectorState valuesState; public NullableVectorState(AbstractObjectWriter writer, NullableVector vector) { this.schema = writer.schema(); this.vector = vector; this.writer = (NullableScalarWriter) writer.scalar(); - bitsState = new FixedWidthVectorState(this.writer.bitsWriter(), vector.getBitsVector()); + bitsState = new IsSetVectorState(this.writer.bitsWriter(), vector.getBitsVector()); valuesState = SimpleVectorState.vectorState(this.writer.schema(), this.writer.baseWriter(), vector.getValuesVector()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java index cfb7130ce..e57373caa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java @@ -85,6 +85,23 @@ public abstract class SingleVectorState implements VectorState { } } + public static class IsSetVectorState extends FixedWidthVectorState { + + public IsSetVectorState(WriterPosition writer, ValueVector mainVector) { + super(writer, mainVector); + } + + @Override + public int allocateVector(ValueVector vector, int cardinality) { + int size = super.allocateVector(vector, cardinality); + + // IsSet ("bit") vectors rely on values being initialized to zero (unset.) + + ((FixedWidthVector) vector).zeroVector(); + return size; + } + } + /** * State for a scalar value vector. The vector might be for a simple (non-array) * vector, or might be the payload part of a scalar array (repeated scalar) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java index 45a704c07..cd782c766 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/project/RequestedTupleImpl.java @@ -44,13 +44,14 @@ import org.apache.drill.exec.record.metadata.TupleNameSpace; * <p> * Examples:<br> * <code>m</code><br> - * If m turns out to be a map, project all members of m.<br> + * If <code>m</code> turns out to be a map, project all members of + * <code>m</code>.<br> * <code>m.a</code><br> - * Column m must be a map. Project only column a.<br> + * Column <code>m</code> must be a map. Project only column <code>a</code>.<br> * <code>m, m.a</code><br> * Tricky case. We interpret this as projecting only the "a" element of map m. * <p> - * The projection set is build from a list of columns, represented as + * The projection set is built from a list of columns, represented as * {@link SchemaPath} objects, provided by the physical plan. The structure of * <tt>SchemaPath</tt> is a bit awkward: * <p> diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java new file mode 100644 index 000000000..c69357a71 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan; + +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; +import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; + +public class ScanTestUtils { + + /** + * Type-safe way to define a list of parsers. + * @param parsers as a varArgs list convenient for testing + * @return parsers as a Java List for input to the scan + * projection framework + */ + + public static List<ScanProjectionParser> parsers(ScanProjectionParser... parsers) { + return ImmutableList.copyOf(parsers); + } + + public static List<SchemaProjectionResolver> resolvers(SchemaProjectionResolver... resolvers) { + return ImmutableList.copyOf(resolvers); + } + + public static TupleMetadata schema(ResolvedTuple output) { + final TupleMetadata schema = new TupleSchema(); + for (final ResolvedColumn col : output.columns()) { + MaterializedField field = col.schema(); + if (field.getType() == null) { + + // Convert from internal format of null columns (unset type) + // to a usable form (explicit minor type of NULL.) + + field = MaterializedField.create(field.getName(), + Types.optional(MinorType.NULL)); + } + schema.add(field); + } + return schema; + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java new file mode 100644 index 000000000..c524577d9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec; +import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * Drill allows file metadata columns (also called "implicit" columns.) + * These are columns that contain a long repeated sequences of the same + * values. The ConstantColumnLoader builds and populates these columns. + */ + +public class TestConstantColumnLoader extends SubOperatorTest { + + private static class DummyColumn implements ConstantColumnSpec { + + private final String name; + private final MaterializedField schema; + private final String value; + + public DummyColumn(String name, MajorType type, String value) { + this.name = name; + this.schema = MaterializedField.create(name, type); + this.value = value; + } + + @Override + public String name() { return name; } + + @Override + public MaterializedField schema() { return schema; } + + @Override + public String value() { return value; } + } + + /** + * Test the static column loader using one column of each type. + * The null column is of type int, but the associated value is of + * type string. This is a bit odd, but works out because we detect that + * the string value is null and call setNull on the writer, and avoid + * using the actual data. + */ + + @Test + public void testConstantColumnLoader() { + + final MajorType aType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.REQUIRED) + .build(); + final MajorType bType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + + final List<ConstantColumnSpec> defns = new ArrayList<>(); + defns.add( + new DummyColumn("a", aType, "a-value" )); + defns.add( + new DummyColumn("b", bType, "b-value" )); + + final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + final ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns); + + // Create a batch + + staticLoader.load(2); + + // Verify + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("a", aType) + .add("b", bType) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("a-value", "b-value") + .addRow("a-value", "b-value") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(staticLoader.load(2))); + staticLoader.close(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java new file mode 100644 index 000000000..4bf5a6ad9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertSame; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; +import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.Test; + +/** + * Test the mechanism that handles all-null columns during projection. + * An all-null column is one projected in the query, but which does + * not actually exist in the underlying data source (or input + * operator.) + * <p> + * In anticipation of having type information, this mechanism + * can create the classic nullable Int null column, or one of + * any other type and mode. + */ + +public class TestNullColumnLoader extends SubOperatorTest { + + private ResolvedNullColumn makeNullCol(String name, MajorType nullType) { + + // For this test, we don't need the projection, so just + // set it to null. + + return new ResolvedNullColumn(name, nullType, null, 0); + } + + private ResolvedNullColumn makeNullCol(String name) { + return makeNullCol(name, null); + } + + /** + * Test the simplest case: default null type, nothing in the vector + * cache. Specify no column type, the special NULL type, or a + * predefined type. Output types should be set accordingly. + */ + + @Test + public void testBasics() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("unspecified", null)); + defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL))); + defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR))); + defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR))); + defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR))); + + final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE) + .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE) + .addNullable("specifiedOpt", MinorType.VARCHAR) + .addNullable("specifiedReq", MinorType.VARCHAR) + .addArray("specifiedArray", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null, null, null, new String[] {}) + .addRow(null, null, null, null, new String[] {}) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Test the ability to use a type other than nullable INT for null + * columns. This occurs, for example, in the CSV reader where no + * column is ever INT (nullable or otherwise) and we want our null + * columns to be (non-nullable) VARCHAR. + */ + + @Test + public void testCustomNullType() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("unspecified", null)); + defns.add(makeNullCol("nullType", MajorType.newBuilder() + .setMinorType(MinorType.NULL) + .setMode(DataMode.OPTIONAL) + .build())); + + // Null required is an oxymoron, so is not tested. + // Null type array does not make sense, so is not tested. + + final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + final MajorType nullType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("unspecified", nullType) + .add("nullType", nullType) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null) + .addRow(null, null) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Drill requires "schema persistence": if a scan operator + * reads two files, F1 and F2, then the scan operator must + * provide the same vectors from both readers. Not just the + * same types, the same value vector instances (but, of course, + * populated with different data.) + * <p> + * Test the case in which the reader for F1 found columns + * (a, b, c) but, F2 found only (a, b), requiring that we + * fill in column c, filled with nulls, but of the same type that it + * was in file F1. We use a vector cache to pull off this trick. + * This test ensures that the null column mechanism looks in that + * vector cache when asked to create a nullable column. + */ + + @Test + public void testCachedTypesMapToNullable() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("req")); + defns.add(makeNullCol("opt")); + defns.add(makeNullCol("rep")); + defns.add(makeNullCol("unk")); + + // Populate the cache with a column of each mode. + + final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED)); + final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL)); + final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED)); + + // Use nullable Varchar for unknown null columns. + + final MajorType nullType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify vectors are reused + + assertSame(opt, output.getValueVector(1).getValueVector()); + assertSame(rep, output.getValueVector(2).getValueVector()); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .addNullable("req", MinorType.FLOAT8) + .addNullable("opt", MinorType.FLOAT8) + .addArray("rep", MinorType.FLOAT8) + .addNullable("unk", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null, new int[] { }, null) + .addRow(null, null, new int[] { }, null) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Suppose, in the previous test, that one of the columns that + * goes missing is a required column. The null-column mechanism can + * create the "null" column as a required column, then fill it with + * empty values (zero or "") -- if the scan operator feels doing so would + * be helpful. + */ + + @Test + public void testCachedTypesAllowRequired() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("req")); + defns.add(makeNullCol("opt")); + defns.add(makeNullCol("rep")); + defns.add(makeNullCol("unk")); + + // Populate the cache with a column of each mode. + + final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED)); + final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL)); + final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED)); + + // Use nullable Varchar for unknown null columns. + + final MajorType nullType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify vectors are reused + + assertSame(opt, output.getValueVector(1).getValueVector()); + assertSame(rep, output.getValueVector(2).getValueVector()); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("req", MinorType.FLOAT8) + .addNullable("opt", MinorType.FLOAT8) + .addArray("rep", MinorType.FLOAT8) + .addNullable("unk", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(0.0, null, new int[] { }, null) + .addRow(0.0, null, new int[] { }, null) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Test the shim class that adapts between the null column loader + * and the projection mechanism. The projection mechanism uses this + * to pull in the null columns which the null column loader has + * created. + */ + + @Test + public void testNullColumnBuilder() { + + final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + + builder.add("unspecified"); + builder.add("nullType", Types.optional(MinorType.NULL)); + builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR)); + builder.add("specifiedReq", Types.required(MinorType.VARCHAR)); + builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR)); + builder.build(cache); + + // Create a batch + + builder.load(2); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE) + .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE) + .addNullable("specifiedOpt", MinorType.VARCHAR) + .addNullable("specifiedReq", MinorType.VARCHAR) + .addArray("specifiedArray", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null, null, null, new String[] {}) + .addRow(null, null, null, null, new String[] {}) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(builder.output())); + builder.close(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java new file mode 100644 index 000000000..cb0365ca9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.VectorSource; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.DrillBuf; + +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; +import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray; + + +/** + * Test the row batch merger by merging two batches. Tests both the + * "direct" and "exchange" cases. Direct means that the output container + * contains the source vector directly: they are the same vectors. + * Exchange means we have two vectors, but we swap the underlying + * Drillbufs to effectively shift data from source to destination + * vector. + */ + +public class TestRowBatchMerger extends SubOperatorTest { + + public static class RowSetSource implements VectorSource { + + private SingleRowSet rowSet; + + public RowSetSource(SingleRowSet rowSet) { + this.rowSet = rowSet; + } + + public RowSet rowSet() { return rowSet; } + + public void clear() { + rowSet.clear(); + } + + @Override + public ValueVector vector(int index) { + return rowSet.container().getValueVector(index).getValueVector(); + } + } + + public static final int SLAB_SIZE = 16 * 1024 * 1024; + + @BeforeClass + public static void setup() { + // Party on 10 16MB blocks of memory to detect vector issues + DrillBuf bufs[] = new DrillBuf[10]; + for (int i = 0; i < bufs.length; i++) { + bufs[i] = fixture.allocator().buffer(SLAB_SIZE); + for (int j = 0; j < SLAB_SIZE / 4; j++) { + bufs[i].setInt(j * 4, 0xDEADBEEF); + } + } + for (int i = 0; i < bufs.length; i++) { + bufs[i].release(); + } + } + + private RowSetSource makeFirst() { + BatchSchema firstSchema = new SchemaBuilder() + .add("d", MinorType.VARCHAR) + .add("a", MinorType.INT) + .build(); + return new RowSetSource( + fixture.rowSetBuilder(firstSchema) + .addRow("barney", 10) + .addRow("wilma", 20) + .build()); + } + + private RowSetSource makeSecond() { + BatchSchema secondSchema = new SchemaBuilder() + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .build(); + return new RowSetSource( + fixture.rowSetBuilder(secondSchema) + .addRow(1, "foo.csv") + .addRow(2, "foo.csv") + .build()); + } + + public static class TestProjection extends ResolvedColumn { + + public TestProjection(VectorSource source, int sourceIndex) { + super(source, sourceIndex); + } + + @Override + public String name() { return null; } + + @Override + public int nodeType() { return -1; } + + @Override + public MaterializedField schema() { return null; } + } + + @Test + public void testSimpleFlat() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create the second batch + + RowSetSource second = makeSecond(); + + ResolvedRow resolvedTuple = new ResolvedRow(null); + resolvedTuple.add(new TestProjection(first, 1)); + resolvedTuple.add(new TestProjection(second, 0)); + resolvedTuple.add(new TestProjection(second, 1)); + resolvedTuple.add(new TestProjection(first, 0)); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(null, output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, 1, "foo.csv", "barney") + .addRow(20, 2, "foo.csv", "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + @Test + public void testImplicitFlat() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create the second batch + + RowSetSource second = makeSecond(); + + ResolvedRow resolvedTuple = new ResolvedRow(null); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + resolvedTuple.add(new TestProjection(second, 0)); + resolvedTuple.add(new TestProjection(second, 1)); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, 1, "foo.csv", "barney") + .addRow(20, 2, "foo.csv", "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + @Test + public void testFlatWithNulls() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create null columns + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + + ResolvedRow resolvedTuple = new ResolvedRow(builder); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + resolvedTuple.add(resolvedTuple.nullBuilder().add("null1")); + resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + builder.build(cache); + builder.load(first.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("null1", MinorType.INT) + .addNullable("null2", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, null, null, "barney") + .addRow(20, null, null, "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + builder.close(); + } + + /** + * Test the ability to create maps from whole cloth if requested in + * the projection list, and the map is not available from the data + * source. + */ + + @Test + public void testNullMaps() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create null columns + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + + ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1"); + ResolvedTuple nullMap = nullMapCol.members(); + nullMap.add(nullMap.nullBuilder().add("null1")); + nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); + + ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2"); + ResolvedTuple nullMap2 = nullMapCol2.members(); + nullMap2.add(nullMap2.nullBuilder().add("null3")); + nullMap.add(nullMapCol2); + + resolvedTuple.add(nullMapCol); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(first.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + resolvedTuple.setRowCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("map1") + .addNullable("null1", MinorType.INT) + .addNullable("null2", MinorType.VARCHAR) + .addMap("map2") + .addNullable("null3", MinorType.INT) + .resumeMap() + .resumeSchema() + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, mapValue(null, null, singleMap(null)), "barney") + .addRow(20, mapValue(null, null, singleMap(null)), "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + resolvedTuple.close(); + } + + /** + * Test that the merger mechanism can rewrite a map to include + * projected null columns. + */ + + @Test + public void testMapRevision() { + + // Create the first batch + + BatchSchema inputSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMap("a") + .add("c", MinorType.INT) + .resumeSchema() + .build(); + RowSetSource input = new RowSetSource( + fixture.rowSetBuilder(inputSchema) + .addRow("barney", singleMap(10)) + .addRow("wilma", singleMap(20)) + .build()); + + // Create mappings + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, + inputSchema.getColumn(1), 1); + resolvedTuple.add(mapCol); + ResolvedTuple map = mapCol.members(); + map.add(new TestProjection(map, 0)); + map.add(map.nullBuilder().add("null1")); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(input.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(input.rowSet().container(), output); + output.setRecordCount(input.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMap("a") + .add("c", MinorType.INT) + .addNullable("null1", MinorType.INT) + .resumeSchema() + .build(); + RowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("barney", mapValue(10, null)) + .addRow("wilma", mapValue(20, null)) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + /** + * Test that the merger mechanism can rewrite a map array to include + * projected null columns. + */ + + @Test + public void testMapArrayRevision() { + + // Create the first batch + + BatchSchema inputSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMapArray("a") + .add("c", MinorType.INT) + .resumeSchema() + .build(); + RowSetSource input = new RowSetSource( + fixture.rowSetBuilder(inputSchema) + .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12))) + .addRow("wilma", mapArray(singleMap(20), singleMap(21))) + .build()); + + // Create mappings + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, + inputSchema.getColumn(1), 1); + resolvedTuple.add(mapCol); + ResolvedTuple map = mapCol.members(); + map.add(new TestProjection(map, 0)); + map.add(map.nullBuilder().add("null1")); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(input.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(input.rowSet().container(), output); + output.setRecordCount(input.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMapArray("a") + .add("c", MinorType.INT) + .addNullable("null1", MinorType.INT) + .resumeSchema() + .build(); + RowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("barney", mapArray( + mapValue(10, null), mapValue(11, null), mapValue(12, null))) + .addRow("wilma", mapArray( + mapValue(20, null), mapValue(21, null))) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java new file mode 100644 index 000000000..e07ad9a62 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; +import org.apache.drill.exec.record.metadata.ProjectionType; +import org.apache.drill.test.SubOperatorTest; +import org.junit.Test; + +/** + * Test the level of projection done at the level of the scan as a whole; + * before knowledge of table "implicit" columns or the specific table schema. + */ + +public class TestScanLevelProjection extends SubOperatorTest { + + /** + * Basic test: select a set of columns (a, b, c) when the + * data source has an early schema of (a, c, d). (a, c) are + * projected, (d) is null. + */ + + @Test + public void testBasics() { + + // Simulate SELECT a, b, c ... + // Build the projection plan and verify + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a", "b", "c"), + ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + + assertEquals(3, scanProj.requestedCols().size()); + assertEquals("a", scanProj.requestedCols().get(0).rootName()); + assertEquals("b", scanProj.requestedCols().get(1).rootName()); + assertEquals("c", scanProj.requestedCols().get(2).rootName()); + + assertEquals(3, scanProj.columns().size()); + assertEquals("a", scanProj.columns().get(0).name()); + assertEquals("b", scanProj.columns().get(1).name()); + assertEquals("c", scanProj.columns().get(2).name()); + + // Verify column type + + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + } + + /** + * Map projection occurs when a query contains project-list items with + * a dot, such as "a.b". We may not know the type of "b", but have + * just learned that "a" must be a map. + */ + + @Test + public void testMap() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"), + ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + + assertEquals(3, scanProj.columns().size()); + assertEquals("a", scanProj.columns().get(0).name()); + assertEquals("b", scanProj.columns().get(1).name()); + assertEquals("c", scanProj.columns().get(2).name()); + + // Verify column type + + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + + // Map structure + + final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element(); + assertTrue(a.isTuple()); + assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x")); + assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y")); + assertEquals(ProjectionType.UNPROJECTED, a.mapProjection().projectionType("z")); + + final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element(); + assertTrue(c.isSimple()); + } + + /** + * Similar to maps, if the project list contains "a[1]" then we've learned that + * a is an array, but we don't know what type. + */ + + @Test + public void testArray() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a[1]", "a[3]"), + ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + + assertEquals(1, scanProj.columns().size()); + assertEquals("a", scanProj.columns().get(0).name()); + + // Verify column type + + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + + // Map structure + + final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element(); + assertTrue(a.isArray()); + assertFalse(a.hasIndex(0)); + assertTrue(a.hasIndex(1)); + assertFalse(a.hasIndex(2)); + assertTrue(a.hasIndex(3)); + } + + /** + * Simulate a SELECT * query by passing "*" as a column name. + */ + + @Test + public void testWildcard() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + assertTrue(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + assertEquals(1, scanProj.requestedCols().size()); + assertTrue(scanProj.requestedCols().get(0).isDynamicStar()); + + assertEquals(1, scanProj.columns().size()); + assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name()); + + // Verify bindings + + assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName()); + + // Verify column type + + assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType()); + } + + /** + * Test an empty projection which occurs in a + * SELECT COUNT(*) query. + */ + + @Test + public void testEmptyProjection() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(), + ScanTestUtils.parsers()); + + assertFalse(scanProj.projectAll()); + assertTrue(scanProj.projectNone()); + assertEquals(0, scanProj.requestedCols().size()); + } + + /** + * Can't include both a wildcard and a column name. + */ + + @Test + public void testErrorWildcardAndColumns() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"), + ScanTestUtils.parsers()); + fail(); + } catch (final IllegalArgumentException e) { + // Expected + } + } + + /** + * Can't include both a column name and a wildcard. + */ + + @Test + public void testErrorColumnAndWildcard() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR), + ScanTestUtils.parsers()); + fail(); + } catch (final IllegalArgumentException e) { + // Expected + } + } + + /** + * Can't include a wildcard twice. + * <p> + * Note: Drill actually allows this, but the work should be done + * in the project operator; scan should see at most one wildcard. + */ + + @Test + public void testErrorTwoWildcards() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR), + ScanTestUtils.parsers()); + fail(); + } catch (final UserException e) { + // Expected + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java new file mode 100644 index 000000000..839e2caa4 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.VectorSource; +import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * "Schema level projection" describes one side of the projection + * mechanism. When we project, we have the set of column the user + * wants "the schema level" and the set of columns on offer from + * the data source "the scan level." The projection mechanism + * combines these to map out the actual projection. + */ + +public class TestSchemaLevelProjection extends SubOperatorTest { + + /** + * Test wildcard projection: take all columns on offer from + * the data source, in the order that the data source specifies. + */ + + @Test + public void testWildcard() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + assertEquals(1, scanProj.columns().size()); + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .addNullable("c", MinorType.INT) + .addArray("d", MinorType.FLOAT8) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new WildcardSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + + assertEquals("a", columns.get(0).name()); + assertEquals(0, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + assertEquals("c", columns.get(1).name()); + assertEquals(1, columns.get(1).sourceIndex()); + assertSame(rootTuple, columns.get(1).source()); + assertEquals("d", columns.get(2).name()); + assertEquals(2, columns.get(2).sourceIndex()); + assertSame(rootTuple, columns.get(2).source()); + } + + /** + * Test SELECT list with columns defined in a order and with + * name case different than the early-schema table. + */ + + @Test + public void testFullList() { + + // Simulate SELECT c, b, a ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("c", "b", "a"), + ScanTestUtils.parsers()); + assertEquals(3, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a, b, c) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("B", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + + assertEquals("c", columns.get(0).name()); + assertEquals(2, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + + assertEquals("b", columns.get(1).name()); + assertEquals(1, columns.get(1).sourceIndex()); + assertSame(rootTuple, columns.get(1).source()); + + assertEquals("a", columns.get(2).name()); + assertEquals(0, columns.get(2).sourceIndex()); + assertSame(rootTuple, columns.get(2).source()); + } + + /** + * Test SELECT list with columns missing from the table schema. + */ + + @Test + public void testMissing() { + + // Simulate SELECT c, v, b, w ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("c", "v", "b", "w"), + ScanTestUtils.parsers()); + assertEquals(4, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a, b, c) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("B", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(4, columns.size()); + final VectorSource nullBuilder = rootTuple.nullBuilder(); + + assertEquals("c", columns.get(0).name()); + assertEquals(2, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + + assertEquals("v", columns.get(1).name()); + assertEquals(0, columns.get(1).sourceIndex()); + assertSame(nullBuilder, columns.get(1).source()); + + assertEquals("b", columns.get(2).name()); + assertEquals(1, columns.get(2).sourceIndex()); + assertSame(rootTuple, columns.get(2).source()); + + assertEquals("w", columns.get(3).name()); + assertEquals(1, columns.get(3).sourceIndex()); + assertSame(nullBuilder, columns.get(3).source()); + } + + /** + * Test an explicit projection (providing columns) in which the + * names in the project lists are a different case than the data + * source, the order of columns differs, and we ask for a + * subset of data source columns. + */ + @Test + public void testSubset() { + + // Simulate SELECT c, a ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("c", "a"), + ScanTestUtils.parsers()); + assertEquals(2, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a, b, c) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("B", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(2, columns.size()); + + assertEquals("c", columns.get(0).name()); + assertEquals(2, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + + assertEquals("a", columns.get(1).name()); + assertEquals(0, columns.get(1).sourceIndex()); + assertSame(rootTuple, columns.get(1).source()); + } + + /** + * Drill is unique in that we can select (a, b) from a data source + * that only offers (c, d). We get null columns as a result. + */ + + @Test + public void testDisjoint() { + + // Simulate SELECT c, a ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("b"), + ScanTestUtils.parsers()); + assertEquals(1, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(1, columns.size()); + final VectorSource nullBuilder = rootTuple.nullBuilder(); + + assertEquals("b", columns.get(0).name()); + assertEquals(0, columns.get(0).sourceIndex()); + assertSame(nullBuilder, columns.get(0).source()); + } + + /** + * Test the obscure case that the data source contains a map, but we + * project only one of the members of the map. The output should be a + * map that contains only the members we request. + */ + + @Test + public void testOmittedMap() { + + // Simulate SELECT a, b.c.d ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a", "b.c.d"), + ScanTestUtils.parsers()); + assertEquals(2, scanProj.columns().size()); + { + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType()); + final UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1)); + assertTrue(bCol.element().isTuple()); + } + + // Simulate a data source, with early schema, of (a) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(2, columns.size()); + + // Should have resolved a to a table column, b to a missing map. + + // A is projected + + final ResolvedColumn aCol = columns.get(0); + assertEquals("a", aCol.name()); + assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); + + // B is not projected, is implicitly a map + + final ResolvedColumn bCol = columns.get(1); + assertEquals("b", bCol.name()); + assertEquals(ResolvedMapColumn.ID, bCol.nodeType()); + + final ResolvedMapColumn bMap = (ResolvedMapColumn) bCol; + final ResolvedTuple bMembers = bMap.members(); + assertNotNull(bMembers); + assertEquals(1, bMembers.columns().size()); + + // C is a map within b + + final ResolvedColumn cCol = bMembers.columns().get(0); + assertEquals(ResolvedMapColumn.ID, cCol.nodeType()); + + final ResolvedMapColumn cMap = (ResolvedMapColumn) cCol; + final ResolvedTuple cMembers = cMap.members(); + assertNotNull(cMembers); + assertEquals(1, cMembers.columns().size()); + + // D is an unknown column type (not a map) + + final ResolvedColumn dCol = cMembers.columns().get(0); + assertEquals(ResolvedNullColumn.ID, dCol.nodeType()); + } + + /** + * Test of a map with missing columns. + * table of (a{b, c}), project a.c, a.d, a.e.f + */ + + @Test + public void testOmittedMapMembers() { + + // Simulate SELECT a.c, a.d, a.e.f ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"), + ScanTestUtils.parsers()); + assertEquals(3, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (x, y, a{b, c}) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("x", MinorType.VARCHAR) + .add("y", MinorType.INT) + .addMap("a") + .add("b", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .resumeSchema() + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + + // Should have resolved a.b to a map column, + // a.d to a missing nested map, and a.e.f to a missing + // nested map member + + // X is projected + + final ResolvedColumn xCol = columns.get(0); + assertEquals("x", xCol.name()); + assertEquals(ResolvedTableColumn.ID, xCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source()); + assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex()); + + // Y is projected + + final ResolvedColumn yCol = columns.get(2); + assertEquals("y", yCol.name()); + assertEquals(ResolvedTableColumn.ID, yCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source()); + assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex()); + + // A is projected + + final ResolvedColumn aCol = columns.get(1); + assertEquals("a", aCol.name()); + assertEquals(ResolvedMapColumn.ID, aCol.nodeType()); + + final ResolvedMapColumn aMap = (ResolvedMapColumn) aCol; + final ResolvedTuple aMembers = aMap.members(); + assertFalse(aMembers.isSimpleProjection()); + assertNotNull(aMembers); + assertEquals(3, aMembers.columns().size()); + + // a.c is projected + + final ResolvedColumn acCol = aMembers.columns().get(0); + assertEquals("c", acCol.name()); + assertEquals(ResolvedTableColumn.ID, acCol.nodeType()); + assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex()); + + // a.d is not in the table, is null + + final ResolvedColumn adCol = aMembers.columns().get(1); + assertEquals("d", adCol.name()); + assertEquals(ResolvedNullColumn.ID, adCol.nodeType()); + + // a.e is not in the table, is implicitly a map + + final ResolvedColumn aeCol = aMembers.columns().get(2); + assertEquals("e", aeCol.name()); + assertEquals(ResolvedMapColumn.ID, aeCol.nodeType()); + + final ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol; + final ResolvedTuple aeMembers = aeMap.members(); + assertFalse(aeMembers.isSimpleProjection()); + assertNotNull(aeMembers); + assertEquals(1, aeMembers.columns().size()); + + // a.d.f is a null column + + final ResolvedColumn aefCol = aeMembers.columns().get(0); + assertEquals("f", aefCol.name()); + assertEquals(ResolvedNullColumn.ID, aefCol.nodeType()); + } + + /** + * Simple map project. This is an internal case in which the + * query asks for a set of columns inside a map, and the table + * loader produces exactly that set. No special projection is + * needed, the map is projected as a whole. + */ + + @Test + public void testSimpleMapProject() { + + // Simulate SELECT a.b, a.c ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a.b", "a.c"), + ScanTestUtils.parsers()); + assertEquals(1, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a{b, c}) + + final TupleMetadata tableSchema = new SchemaBuilder() + .addMap("a") + .add("b", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .resumeSchema() + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(1, columns.size()); + + // Should have resolved a.b to a map column, + // a.d to a missing nested map, and a.e.f to a missing + // nested map member + + // a is projected as a vector, not as a structured map + + final ResolvedColumn aCol = columns.get(0); + assertEquals("a", aCol.name()); + assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source()); + assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex()); + } + + /** + * Project of a non-map as a map + */ + + @Test + public void testMapMismatch() { + + // Simulate SELECT a.b ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a.b"), + ScanTestUtils.parsers()); + + // Simulate a data source, with early schema, of (a) + // where a is not a map. + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + try { + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final UserException e) { + // Expected + } + } + + /** + * Test project of an array. At the scan level, we just verify + * that the requested column is, indeed, an array. + */ + + @Test + public void testArrayProject() { + + // Simulate SELECT a[0] ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a[0]"), + ScanTestUtils.parsers()); + + // Simulate a data source, with early schema, of (a) + // where a is not an array. + + final TupleMetadata tableSchema = new SchemaBuilder() + .addArray("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(1, columns.size()); + + final ResolvedColumn aCol = columns.get(0); + assertEquals("a", aCol.name()); + assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source()); + assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex()); + } + + /** + * Project of a non-array as an array + */ + + @Test + public void testArrayMismatch() { + + // Simulate SELECT a[0] ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a[0]"), + ScanTestUtils.parsers()); + + // Simulate a data source, with early schema, of (a) + // where a is not an array. + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + try { + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final UserException e) { + // Expected + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java new file mode 100644 index 000000000..977cd216b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java @@ -0,0 +1,691 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException; +import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection; +import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * Tests schema smoothing at the schema projection level. + * This level handles reusing prior types when filling null + * values. But, because no actual vectors are involved, it + * does not handle the schema chosen for a table ahead of + * time, only the schema as it is merged with prior schema to + * detect missing columns. + * <p> + * Focuses on the <tt>SmoothingProjection</tt> class itself. + * <p> + * Note that, at present, schema smoothing does not work for entire + * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt> + * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does + * not currently know how to recreate the map. The same is true of + * lists and unions. Handling such cases is complex and is probably + * better handled via a system that allows the user to specify their + * intent by providing a schema to apply to the two files. + * <p> + * Note that schema smoothing itself is an experimental work-around + * to a fundamental limitation in Drill: + * <ul> + * <li>Drill cannot predict the future: each file (or batch) + * may have a different schema.</ul> + * <li>Drill does not know about these differences until they + * occur.</li> + * <li>The scan operator is obligated to return the same schema + * (and same vectors) from one file to the next, else a "hard" + * schema change is sent down stream.</li> + * </ul> + * + * The problem is actually intractable. The schema smoother handles the + * cases that can be handled, such as required --> nullable, a column + * disappearing, etc. This whole mechanism should be scrapped if/when + * Drill can work with schemas. Or, keep this to handle, as best we can, + * odd schemas, but insist on a schema to resolve issues that this + * mechanism cannot handle (and that, indeed, no algorithm could handle + * because such an algorithm would require time-travel: looking into + * the future to know what data will be scanned.) + */ + +public class TestSchemaSmoothing extends SubOperatorTest { + + /** + * Low-level test of the smoothing projection, including the exceptions + * it throws when things are not going its way. + */ + + @Test + public void testSmoothingProjection() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + // Table 1: (a: nullable bigint, b) + + final TupleMetadata schema1 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + ResolvedRow priorSchema; + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new WildcardSchemaProjection( + scanProj, schema1, rootTuple, + ScanTestUtils.resolvers()); + priorSchema = rootTuple; + } + + // Table 2: (a: nullable bigint, c), column omitted, original schema preserved + + final TupleMetadata schema2 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema2, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + priorSchema = rootTuple; + } catch (final IncompatibleSchemaException e) { + fail(); + } + + // Table 3: (a, c, d), column added, must replan schema + + final TupleMetadata schema3 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .add("d", MinorType.INT) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema3, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + + // Table 4: (a: double), change type must replan schema + + final TupleMetadata schema4 = new SchemaBuilder() + .addNullable("a", MinorType.FLOAT8) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema4, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + + // Table 5: Drop a non-nullable column, must replan + + final TupleMetadata schema6 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema6, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + } + + /** + * Case in which the table schema is a superset of the prior + * schema. Discard prior schema. Turn off auto expansion of + * metadata for a simpler test. + */ + + @Test + public void testSmaller() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(priorSchema, rootTuple); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(tableSchema, rootTuple); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * Case in which the table schema and prior are disjoint + * sets. Discard the prior schema. + */ + + @Test + public void testDisjoint() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(schema, rootTuple); + return rootTuple; + } + + /** + * Column names match, but types differ. Discard the prior schema. + */ + + @Test + public void testDifferentTypes() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * The prior and table schemas are identical. Preserve the prior + * schema (though, the output is no different than if we discarded + * the prior schema...) + */ + + @Test + public void testSameSchemas() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple); + assertTrue(actualSchema.isEquivalent(tableSchema)); + assertTrue(actualSchema.isEquivalent(priorSchema)); + } + } + + /** + * The prior and table schemas are identical, but the cases of names differ. + * Preserve the case of the first schema. + */ + + @Test + public void testDifferentCase() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.INT) + .add("B", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals("a", columns.get(0).name()); + } + } + + /** + * Can't preserve the prior schema if it had required columns + * where the new schema has no columns. + */ + + @Test + public void testRequired() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * Preserve the prior schema if table is a subset and missing columns + * are nullable or repeated. + */ + + @Test + public void testMissingNullableColumns() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .addNullable("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + } + + /** + * Preserve the prior schema if table is a subset. Map the table + * columns to the output using the prior schema ordering. + */ + + @Test + public void testReordering() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .addNullable("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addNullable("a", MinorType.INT) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + } + /** + * Integrated test across multiple schemas at the batch level. + */ + + @Test + public void testSmoothableSchemaBatches() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + // Table 1: (a: bigint, b) + + final TupleMetadata schema1 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema1); + + // Just use the original schema. + + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(1, smoother.schemaVersion()); + } + + // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved + + final TupleMetadata schema2 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema2); + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(1, smoother.schemaVersion()); + } + + // Table 3: (a, c, d), column added, must replan schema + + final TupleMetadata schema3 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .add("d", MinorType.INT) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema3); + assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(2, smoother.schemaVersion()); + } + + // Table 4: Drop a non-nullable column, must replan + + final TupleMetadata schema4 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema4); + assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(3, smoother.schemaVersion()); + } + + // Table 5: (a: double), change type must replan schema + + final TupleMetadata schema5 = new SchemaBuilder() + .addNullable("a", MinorType.FLOAT8) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema5); + assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(4, smoother.schemaVersion()); + } + } + + /** + * A SELECT * query uses the schema of the table as the output schema. + * This is trivial when the scanner has one table. But, if two or more + * tables occur, then things get interesting. The first table sets the + * schema. The second table then has: + * <ul> + * <li>The same schema, trivial case.</li> + * <li>A subset of the first table. The type of the "missing" column + * from the first table is used for a null column in the second table.</li> + * <li>A superset or disjoint set of the first schema. This triggers a hard schema + * change.</li> + * </ul> + * <p> + * It is an open question whether previous columns should be preserved on + * a hard reset. For now, the code implements, and this test verifies, that a + * hard reset clears the "memory" of prior schemas. + */ + + @Test + public void testWildcardSmoothing() { + final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator()); + projector.enableSchemaSmoothing(true); + projector.build(RowSetTestUtils.projectAll()); + + final TupleMetadata firstSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR, 10) + .addNullable("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata subsetSchema = new SchemaBuilder() + .addNullable("b", MinorType.VARCHAR, 10) + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata disjointSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR, 10) + .add("d", MinorType.VARCHAR) + .buildSchema(); + + final SchemaTracker tracker = new SchemaTracker(); + int schemaVersion; + { + // First table, establishes the baseline + // ... FROM table 1 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(firstSchema); + + reader.startBatch(); + loader.writer() + .addRow(10, "fred", 110L) + .addRow(20, "wilma", 110L); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + schemaVersion = tracker.schemaVersion(); + + final SingleRowSet expected = fixture.rowSetBuilder(firstSchema) + .addRow(10, "fred", 110L) + .addRow(20, "wilma", 110L) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + { + // Second table, same schema, the trivial case + // ... FROM table 2 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(firstSchema); + + reader.startBatch(); + loader.writer() + .addRow(70, "pebbles", 770L) + .addRow(80, "hoppy", 880L); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + assertEquals(schemaVersion, tracker.schemaVersion()); + + final SingleRowSet expected = fixture.rowSetBuilder(firstSchema) + .addRow(70, "pebbles", 770L) + .addRow(80, "hoppy", 880L) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + { + // Third table: subset schema of first two + // ... FROM table 3 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(subsetSchema); + + reader.startBatch(); + loader.writer() + .addRow("bambam", 30) + .addRow("betty", 40); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + assertEquals(schemaVersion, tracker.schemaVersion()); + + final SingleRowSet expected = fixture.rowSetBuilder(firstSchema) + .addRow(30, "bambam", null) + .addRow(40, "betty", null) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + { + // Fourth table: disjoint schema, cases a schema reset + // ... FROM table 4 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(disjointSchema); + + reader.startBatch(); + loader.writer() + .addRow(50, "dino", "supporting") + .addRow(60, "barney", "main"); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + assertNotEquals(schemaVersion, tracker.schemaVersion()); + + final SingleRowSet expected = fixture.rowSetBuilder(disjointSchema) + .addRow(50, "dino", "supporting") + .addRow(60, "barney", "main") + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + + projector.close(); + } + + // TODO: Test schema smoothing with repeated + // TODO: Test hard schema change + // TODO: Typed null column tests (resurrect) + // TODO: Test maps and arrays of maps +} |