aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
authorPaul Rogers <progers@cloudera.com>2018-10-11 22:47:49 -0700
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-12-10 14:03:05 +0200
commit75b9a788df6d6ea5e73cfedff9e2b47acc27b684 (patch)
tree68f5dac1edda202c95dcbca6c2c4d109ddb16b3d /exec/java-exec/src/test/java/org/apache/drill/exec/physical
parent31ba50fe332df04f38d16906d0be2a69790a342f (diff)
DRILL-6791: Scan projection framework
The "schema projection" mechanism: * Handles none (SELECT COUNT\(*)), some (SELECT a, b, x) and all (SELECT *) projection. * Handles null columns (for projection a column "x" that does not exist in the base table.) * Handles constant columns as used for file metadata (AKA "implicit" columns). * Handle schema persistence: the need to reuse the same vectors across different scanners * Provides a framework for consuming externally-supplied metadata * Since we don't yet have a way to provide "real" metadata, obtains metadata hints from previous batches and from the projection list (a.b implies that "a" is a map, c[0] implies that "c" is an array, etc.) * Handles merging the set of data source columns and null columns to create the final output batch. * Running tests found a failure due to an uninialized "bits" vector. Added code to explicitly fill the bits vectors with zeros in the "result set loader."
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java66
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java114
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java325
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java478
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java234
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java588
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java691
7 files changed, 2496 insertions, 0 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
new file mode 100644
index 000000000..c69357a71
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+public class ScanTestUtils {
+
+ /**
+ * Type-safe way to define a list of parsers.
+ * @param parsers as a varArgs list convenient for testing
+ * @return parsers as a Java List for input to the scan
+ * projection framework
+ */
+
+ public static List<ScanProjectionParser> parsers(ScanProjectionParser... parsers) {
+ return ImmutableList.copyOf(parsers);
+ }
+
+ public static List<SchemaProjectionResolver> resolvers(SchemaProjectionResolver... resolvers) {
+ return ImmutableList.copyOf(resolvers);
+ }
+
+ public static TupleMetadata schema(ResolvedTuple output) {
+ final TupleMetadata schema = new TupleSchema();
+ for (final ResolvedColumn col : output.columns()) {
+ MaterializedField field = col.schema();
+ if (field.getType() == null) {
+
+ // Convert from internal format of null columns (unset type)
+ // to a usable form (explicit minor type of NULL.)
+
+ field = MaterializedField.create(field.getName(),
+ Types.optional(MinorType.NULL));
+ }
+ schema.add(field);
+ }
+ return schema;
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
new file mode 100644
index 000000000..c524577d9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Drill allows file metadata columns (also called "implicit" columns.)
+ * These are columns that contain a long repeated sequences of the same
+ * values. The ConstantColumnLoader builds and populates these columns.
+ */
+
+public class TestConstantColumnLoader extends SubOperatorTest {
+
+ private static class DummyColumn implements ConstantColumnSpec {
+
+ private final String name;
+ private final MaterializedField schema;
+ private final String value;
+
+ public DummyColumn(String name, MajorType type, String value) {
+ this.name = name;
+ this.schema = MaterializedField.create(name, type);
+ this.value = value;
+ }
+
+ @Override
+ public String name() { return name; }
+
+ @Override
+ public MaterializedField schema() { return schema; }
+
+ @Override
+ public String value() { return value; }
+ }
+
+ /**
+ * Test the static column loader using one column of each type.
+ * The null column is of type int, but the associated value is of
+ * type string. This is a bit odd, but works out because we detect that
+ * the string value is null and call setNull on the writer, and avoid
+ * using the actual data.
+ */
+
+ @Test
+ public void testConstantColumnLoader() {
+
+ final MajorType aType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.REQUIRED)
+ .build();
+ final MajorType bType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+
+ final List<ConstantColumnSpec> defns = new ArrayList<>();
+ defns.add(
+ new DummyColumn("a", aType, "a-value" ));
+ defns.add(
+ new DummyColumn("b", bType, "b-value" ));
+
+ final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ final ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns);
+
+ // Create a batch
+
+ staticLoader.load(2);
+
+ // Verify
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", aType)
+ .add("b", bType)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("a-value", "b-value")
+ .addRow("a-value", "b-value")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(staticLoader.load(2)));
+ staticLoader.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java
new file mode 100644
index 000000000..4bf5a6ad9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.Test;
+
+/**
+ * Test the mechanism that handles all-null columns during projection.
+ * An all-null column is one projected in the query, but which does
+ * not actually exist in the underlying data source (or input
+ * operator.)
+ * <p>
+ * In anticipation of having type information, this mechanism
+ * can create the classic nullable Int null column, or one of
+ * any other type and mode.
+ */
+
+public class TestNullColumnLoader extends SubOperatorTest {
+
+ private ResolvedNullColumn makeNullCol(String name, MajorType nullType) {
+
+ // For this test, we don't need the projection, so just
+ // set it to null.
+
+ return new ResolvedNullColumn(name, nullType, null, 0);
+ }
+
+ private ResolvedNullColumn makeNullCol(String name) {
+ return makeNullCol(name, null);
+ }
+
+ /**
+ * Test the simplest case: default null type, nothing in the vector
+ * cache. Specify no column type, the special NULL type, or a
+ * predefined type. Output types should be set accordingly.
+ */
+
+ @Test
+ public void testBasics() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("unspecified", null));
+ defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL)));
+ defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR)));
+ defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR)));
+ defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR)));
+
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .addNullable("specifiedOpt", MinorType.VARCHAR)
+ .addNullable("specifiedReq", MinorType.VARCHAR)
+ .addArray("specifiedArray", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null, null, null, new String[] {})
+ .addRow(null, null, null, null, new String[] {})
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Test the ability to use a type other than nullable INT for null
+ * columns. This occurs, for example, in the CSV reader where no
+ * column is ever INT (nullable or otherwise) and we want our null
+ * columns to be (non-nullable) VARCHAR.
+ */
+
+ @Test
+ public void testCustomNullType() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("unspecified", null));
+ defns.add(makeNullCol("nullType", MajorType.newBuilder()
+ .setMinorType(MinorType.NULL)
+ .setMode(DataMode.OPTIONAL)
+ .build()));
+
+ // Null required is an oxymoron, so is not tested.
+ // Null type array does not make sense, so is not tested.
+
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ final MajorType nullType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("unspecified", nullType)
+ .add("nullType", nullType)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null)
+ .addRow(null, null)
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Drill requires "schema persistence": if a scan operator
+ * reads two files, F1 and F2, then the scan operator must
+ * provide the same vectors from both readers. Not just the
+ * same types, the same value vector instances (but, of course,
+ * populated with different data.)
+ * <p>
+ * Test the case in which the reader for F1 found columns
+ * (a, b, c) but, F2 found only (a, b), requiring that we
+ * fill in column c, filled with nulls, but of the same type that it
+ * was in file F1. We use a vector cache to pull off this trick.
+ * This test ensures that the null column mechanism looks in that
+ * vector cache when asked to create a nullable column.
+ */
+
+ @Test
+ public void testCachedTypesMapToNullable() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("req"));
+ defns.add(makeNullCol("opt"));
+ defns.add(makeNullCol("rep"));
+ defns.add(makeNullCol("unk"));
+
+ // Populate the cache with a column of each mode.
+
+ final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
+ final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
+ final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
+
+ // Use nullable Varchar for unknown null columns.
+
+ final MajorType nullType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify vectors are reused
+
+ assertSame(opt, output.getValueVector(1).getValueVector());
+ assertSame(rep, output.getValueVector(2).getValueVector());
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .addNullable("req", MinorType.FLOAT8)
+ .addNullable("opt", MinorType.FLOAT8)
+ .addArray("rep", MinorType.FLOAT8)
+ .addNullable("unk", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null, new int[] { }, null)
+ .addRow(null, null, new int[] { }, null)
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Suppose, in the previous test, that one of the columns that
+ * goes missing is a required column. The null-column mechanism can
+ * create the "null" column as a required column, then fill it with
+ * empty values (zero or "") -- if the scan operator feels doing so would
+ * be helpful.
+ */
+
+ @Test
+ public void testCachedTypesAllowRequired() {
+
+ final List<ResolvedNullColumn> defns = new ArrayList<>();
+ defns.add(makeNullCol("req"));
+ defns.add(makeNullCol("opt"));
+ defns.add(makeNullCol("rep"));
+ defns.add(makeNullCol("unk"));
+
+ // Populate the cache with a column of each mode.
+
+ final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator());
+ cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED));
+ final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL));
+ final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED));
+
+ // Use nullable Varchar for unknown null columns.
+
+ final MajorType nullType = MajorType.newBuilder()
+ .setMinorType(MinorType.VARCHAR)
+ .setMode(DataMode.OPTIONAL)
+ .build();
+ final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true);
+
+ // Create a batch
+
+ final VectorContainer output = staticLoader.load(2);
+
+ // Verify vectors are reused
+
+ assertSame(opt, output.getValueVector(1).getValueVector());
+ assertSame(rep, output.getValueVector(2).getValueVector());
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("req", MinorType.FLOAT8)
+ .addNullable("opt", MinorType.FLOAT8)
+ .addArray("rep", MinorType.FLOAT8)
+ .addNullable("unk", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(0.0, null, new int[] { }, null)
+ .addRow(0.0, null, new int[] { }, null)
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(output));
+ staticLoader.close();
+ }
+
+ /**
+ * Test the shim class that adapts between the null column loader
+ * and the projection mechanism. The projection mechanism uses this
+ * to pull in the null columns which the null column loader has
+ * created.
+ */
+
+ @Test
+ public void testNullColumnBuilder() {
+
+ final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+ builder.add("unspecified");
+ builder.add("nullType", Types.optional(MinorType.NULL));
+ builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR));
+ builder.add("specifiedReq", Types.required(MinorType.VARCHAR));
+ builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR));
+ builder.build(cache);
+
+ // Create a batch
+
+ builder.load(2);
+
+ // Verify values and types
+
+ final BatchSchema expectedSchema = new SchemaBuilder()
+ .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE)
+ .addNullable("specifiedOpt", MinorType.VARCHAR)
+ .addNullable("specifiedReq", MinorType.VARCHAR)
+ .addArray("specifiedArray", MinorType.VARCHAR)
+ .build();
+ final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(null, null, null, null, new String[] {})
+ .addRow(null, null, null, null, new String[] {})
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(builder.output()));
+ builder.close();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
new file mode 100644
index 000000000..cb0365ca9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+
+
+/**
+ * Test the row batch merger by merging two batches. Tests both the
+ * "direct" and "exchange" cases. Direct means that the output container
+ * contains the source vector directly: they are the same vectors.
+ * Exchange means we have two vectors, but we swap the underlying
+ * Drillbufs to effectively shift data from source to destination
+ * vector.
+ */
+
+public class TestRowBatchMerger extends SubOperatorTest {
+
+ public static class RowSetSource implements VectorSource {
+
+ private SingleRowSet rowSet;
+
+ public RowSetSource(SingleRowSet rowSet) {
+ this.rowSet = rowSet;
+ }
+
+ public RowSet rowSet() { return rowSet; }
+
+ public void clear() {
+ rowSet.clear();
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return rowSet.container().getValueVector(index).getValueVector();
+ }
+ }
+
+ public static final int SLAB_SIZE = 16 * 1024 * 1024;
+
+ @BeforeClass
+ public static void setup() {
+ // Party on 10 16MB blocks of memory to detect vector issues
+ DrillBuf bufs[] = new DrillBuf[10];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = fixture.allocator().buffer(SLAB_SIZE);
+ for (int j = 0; j < SLAB_SIZE / 4; j++) {
+ bufs[i].setInt(j * 4, 0xDEADBEEF);
+ }
+ }
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i].release();
+ }
+ }
+
+ private RowSetSource makeFirst() {
+ BatchSchema firstSchema = new SchemaBuilder()
+ .add("d", MinorType.VARCHAR)
+ .add("a", MinorType.INT)
+ .build();
+ return new RowSetSource(
+ fixture.rowSetBuilder(firstSchema)
+ .addRow("barney", 10)
+ .addRow("wilma", 20)
+ .build());
+ }
+
+ private RowSetSource makeSecond() {
+ BatchSchema secondSchema = new SchemaBuilder()
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .build();
+ return new RowSetSource(
+ fixture.rowSetBuilder(secondSchema)
+ .addRow(1, "foo.csv")
+ .addRow(2, "foo.csv")
+ .build());
+ }
+
+ public static class TestProjection extends ResolvedColumn {
+
+ public TestProjection(VectorSource source, int sourceIndex) {
+ super(source, sourceIndex);
+ }
+
+ @Override
+ public String name() { return null; }
+
+ @Override
+ public int nodeType() { return -1; }
+
+ @Override
+ public MaterializedField schema() { return null; }
+ }
+
+ @Test
+ public void testSimpleFlat() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create the second batch
+
+ RowSetSource second = makeSecond();
+
+ ResolvedRow resolvedTuple = new ResolvedRow(null);
+ resolvedTuple.add(new TestProjection(first, 1));
+ resolvedTuple.add(new TestProjection(second, 0));
+ resolvedTuple.add(new TestProjection(second, 1));
+ resolvedTuple.add(new TestProjection(first, 0));
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(null, output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, 1, "foo.csv", "barney")
+ .addRow(20, 2, "foo.csv", "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testImplicitFlat() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create the second batch
+
+ RowSetSource second = makeSecond();
+
+ ResolvedRow resolvedTuple = new ResolvedRow(null);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+ resolvedTuple.add(new TestProjection(second, 0));
+ resolvedTuple.add(new TestProjection(second, 1));
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, 1, "foo.csv", "barney")
+ .addRow(20, 2, "foo.csv", "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testFlatWithNulls() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create null columns
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+ resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
+ resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ builder.build(cache);
+ builder.load(first.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .addNullable("null2", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, null, null, "barney")
+ .addRow(20, null, null, "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ builder.close();
+ }
+
+ /**
+ * Test the ability to create maps from whole cloth if requested in
+ * the projection list, and the map is not available from the data
+ * source.
+ */
+
+ @Test
+ public void testNullMaps() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create null columns
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+
+ ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
+ ResolvedTuple nullMap = nullMapCol.members();
+ nullMap.add(nullMap.nullBuilder().add("null1"));
+ nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+
+ ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
+ ResolvedTuple nullMap2 = nullMapCol2.members();
+ nullMap2.add(nullMap2.nullBuilder().add("null3"));
+ nullMap.add(nullMapCol2);
+
+ resolvedTuple.add(nullMapCol);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(first.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ resolvedTuple.setRowCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("map1")
+ .addNullable("null1", MinorType.INT)
+ .addNullable("null2", MinorType.VARCHAR)
+ .addMap("map2")
+ .addNullable("null3", MinorType.INT)
+ .resumeMap()
+ .resumeSchema()
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, mapValue(null, null, singleMap(null)), "barney")
+ .addRow(20, mapValue(null, null, singleMap(null)), "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ resolvedTuple.close();
+ }
+
+ /**
+ * Test that the merger mechanism can rewrite a map to include
+ * projected null columns.
+ */
+
+ @Test
+ public void testMapRevision() {
+
+ // Create the first batch
+
+ BatchSchema inputSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMap("a")
+ .add("c", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSetSource input = new RowSetSource(
+ fixture.rowSetBuilder(inputSchema)
+ .addRow("barney", singleMap(10))
+ .addRow("wilma", singleMap(20))
+ .build());
+
+ // Create mappings
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+ inputSchema.getColumn(1), 1);
+ resolvedTuple.add(mapCol);
+ ResolvedTuple map = mapCol.members();
+ map.add(new TestProjection(map, 0));
+ map.add(map.nullBuilder().add("null1"));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(input.rowSet().container(), output);
+ output.setRecordCount(input.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMap("a")
+ .add("c", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("barney", mapValue(10, null))
+ .addRow("wilma", mapValue(20, null))
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ /**
+ * Test that the merger mechanism can rewrite a map array to include
+ * projected null columns.
+ */
+
+ @Test
+ public void testMapArrayRevision() {
+
+ // Create the first batch
+
+ BatchSchema inputSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMapArray("a")
+ .add("c", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSetSource input = new RowSetSource(
+ fixture.rowSetBuilder(inputSchema)
+ .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
+ .addRow("wilma", mapArray(singleMap(20), singleMap(21)))
+ .build());
+
+ // Create mappings
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+ inputSchema.getColumn(1), 1);
+ resolvedTuple.add(mapCol);
+ ResolvedTuple map = mapCol.members();
+ map.add(new TestProjection(map, 0));
+ map.add(map.nullBuilder().add("null1"));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(input.rowSet().container(), output);
+ output.setRecordCount(input.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMapArray("a")
+ .add("c", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("barney", mapArray(
+ mapValue(10, null), mapValue(11, null), mapValue(12, null)))
+ .addRow("wilma", mapArray(
+ mapValue(20, null), mapValue(21, null)))
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
new file mode 100644
index 000000000..e07ad9a62
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
+import org.apache.drill.exec.record.metadata.ProjectionType;
+import org.apache.drill.test.SubOperatorTest;
+import org.junit.Test;
+
+/**
+ * Test the level of projection done at the level of the scan as a whole;
+ * before knowledge of table "implicit" columns or the specific table schema.
+ */
+
+public class TestScanLevelProjection extends SubOperatorTest {
+
+ /**
+ * Basic test: select a set of columns (a, b, c) when the
+ * data source has an early schema of (a, c, d). (a, c) are
+ * projected, (d) is null.
+ */
+
+ @Test
+ public void testBasics() {
+
+ // Simulate SELECT a, b, c ...
+ // Build the projection plan and verify
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a", "b", "c"),
+ ScanTestUtils.parsers());
+ assertFalse(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+
+ assertEquals(3, scanProj.requestedCols().size());
+ assertEquals("a", scanProj.requestedCols().get(0).rootName());
+ assertEquals("b", scanProj.requestedCols().get(1).rootName());
+ assertEquals("c", scanProj.requestedCols().get(2).rootName());
+
+ assertEquals(3, scanProj.columns().size());
+ assertEquals("a", scanProj.columns().get(0).name());
+ assertEquals("b", scanProj.columns().get(1).name());
+ assertEquals("c", scanProj.columns().get(2).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+ }
+
+ /**
+ * Map projection occurs when a query contains project-list items with
+ * a dot, such as "a.b". We may not know the type of "b", but have
+ * just learned that "a" must be a map.
+ */
+
+ @Test
+ public void testMap() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"),
+ ScanTestUtils.parsers());
+ assertFalse(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+
+ assertEquals(3, scanProj.columns().size());
+ assertEquals("a", scanProj.columns().get(0).name());
+ assertEquals("b", scanProj.columns().get(1).name());
+ assertEquals("c", scanProj.columns().get(2).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+ // Map structure
+
+ final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
+ assertTrue(a.isTuple());
+ assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x"));
+ assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y"));
+ assertEquals(ProjectionType.UNPROJECTED, a.mapProjection().projectionType("z"));
+
+ final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element();
+ assertTrue(c.isSimple());
+ }
+
+ /**
+ * Similar to maps, if the project list contains "a[1]" then we've learned that
+ * a is an array, but we don't know what type.
+ */
+
+ @Test
+ public void testArray() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a[1]", "a[3]"),
+ ScanTestUtils.parsers());
+ assertFalse(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+
+ assertEquals(1, scanProj.columns().size());
+ assertEquals("a", scanProj.columns().get(0).name());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType());
+
+ // Map structure
+
+ final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element();
+ assertTrue(a.isArray());
+ assertFalse(a.hasIndex(0));
+ assertTrue(a.hasIndex(1));
+ assertFalse(a.hasIndex(2));
+ assertTrue(a.hasIndex(3));
+ }
+
+ /**
+ * Simulate a SELECT * query by passing "*" as a column name.
+ */
+
+ @Test
+ public void testWildcard() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ assertTrue(scanProj.projectAll());
+ assertFalse(scanProj.projectNone());
+ assertEquals(1, scanProj.requestedCols().size());
+ assertTrue(scanProj.requestedCols().get(0).isDynamicStar());
+
+ assertEquals(1, scanProj.columns().size());
+ assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name());
+
+ // Verify bindings
+
+ assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName());
+
+ // Verify column type
+
+ assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType());
+ }
+
+ /**
+ * Test an empty projection which occurs in a
+ * SELECT COUNT(*) query.
+ */
+
+ @Test
+ public void testEmptyProjection() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList(),
+ ScanTestUtils.parsers());
+
+ assertFalse(scanProj.projectAll());
+ assertTrue(scanProj.projectNone());
+ assertEquals(0, scanProj.requestedCols().size());
+ }
+
+ /**
+ * Can't include both a wildcard and a column name.
+ */
+
+ @Test
+ public void testErrorWildcardAndColumns() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"),
+ ScanTestUtils.parsers());
+ fail();
+ } catch (final IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Can't include both a column name and a wildcard.
+ */
+
+ @Test
+ public void testErrorColumnAndWildcard() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR),
+ ScanTestUtils.parsers());
+ fail();
+ } catch (final IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Can't include a wildcard twice.
+ * <p>
+ * Note: Drill actually allows this, but the work should be done
+ * in the project operator; scan should see at most one wildcard.
+ */
+
+ @Test
+ public void testErrorTwoWildcards() {
+ try {
+ new ScanLevelProjection(
+ RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR),
+ ScanTestUtils.parsers());
+ fail();
+ } catch (final UserException e) {
+ // Expected
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java
new file mode 100644
index 000000000..839e2caa4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java
@@ -0,0 +1,588 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * "Schema level projection" describes one side of the projection
+ * mechanism. When we project, we have the set of column the user
+ * wants "the schema level" and the set of columns on offer from
+ * the data source "the scan level." The projection mechanism
+ * combines these to map out the actual projection.
+ */
+
+public class TestSchemaLevelProjection extends SubOperatorTest {
+
+ /**
+ * Test wildcard projection: take all columns on offer from
+ * the data source, in the order that the data source specifies.
+ */
+
+ @Test
+ public void testWildcard() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+ assertEquals(1, scanProj.columns().size());
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .addNullable("c", MinorType.INT)
+ .addArray("d", MinorType.FLOAT8)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new WildcardSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+
+ assertEquals("a", columns.get(0).name());
+ assertEquals(0, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+ assertEquals("c", columns.get(1).name());
+ assertEquals(1, columns.get(1).sourceIndex());
+ assertSame(rootTuple, columns.get(1).source());
+ assertEquals("d", columns.get(2).name());
+ assertEquals(2, columns.get(2).sourceIndex());
+ assertSame(rootTuple, columns.get(2).source());
+ }
+
+ /**
+ * Test SELECT list with columns defined in a order and with
+ * name case different than the early-schema table.
+ */
+
+ @Test
+ public void testFullList() {
+
+ // Simulate SELECT c, b, a ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("c", "b", "a"),
+ ScanTestUtils.parsers());
+ assertEquals(3, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a, b, c)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("B", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+
+ assertEquals("c", columns.get(0).name());
+ assertEquals(2, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+
+ assertEquals("b", columns.get(1).name());
+ assertEquals(1, columns.get(1).sourceIndex());
+ assertSame(rootTuple, columns.get(1).source());
+
+ assertEquals("a", columns.get(2).name());
+ assertEquals(0, columns.get(2).sourceIndex());
+ assertSame(rootTuple, columns.get(2).source());
+ }
+
+ /**
+ * Test SELECT list with columns missing from the table schema.
+ */
+
+ @Test
+ public void testMissing() {
+
+ // Simulate SELECT c, v, b, w ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("c", "v", "b", "w"),
+ ScanTestUtils.parsers());
+ assertEquals(4, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a, b, c)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("B", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(4, columns.size());
+ final VectorSource nullBuilder = rootTuple.nullBuilder();
+
+ assertEquals("c", columns.get(0).name());
+ assertEquals(2, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+
+ assertEquals("v", columns.get(1).name());
+ assertEquals(0, columns.get(1).sourceIndex());
+ assertSame(nullBuilder, columns.get(1).source());
+
+ assertEquals("b", columns.get(2).name());
+ assertEquals(1, columns.get(2).sourceIndex());
+ assertSame(rootTuple, columns.get(2).source());
+
+ assertEquals("w", columns.get(3).name());
+ assertEquals(1, columns.get(3).sourceIndex());
+ assertSame(nullBuilder, columns.get(3).source());
+ }
+
+ /**
+ * Test an explicit projection (providing columns) in which the
+ * names in the project lists are a different case than the data
+ * source, the order of columns differs, and we ask for a
+ * subset of data source columns.
+ */
+ @Test
+ public void testSubset() {
+
+ // Simulate SELECT c, a ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("c", "a"),
+ ScanTestUtils.parsers());
+ assertEquals(2, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a, b, c)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .add("B", MinorType.VARCHAR)
+ .add("C", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(2, columns.size());
+
+ assertEquals("c", columns.get(0).name());
+ assertEquals(2, columns.get(0).sourceIndex());
+ assertSame(rootTuple, columns.get(0).source());
+
+ assertEquals("a", columns.get(1).name());
+ assertEquals(0, columns.get(1).sourceIndex());
+ assertSame(rootTuple, columns.get(1).source());
+ }
+
+ /**
+ * Drill is unique in that we can select (a, b) from a data source
+ * that only offers (c, d). We get null columns as a result.
+ */
+
+ @Test
+ public void testDisjoint() {
+
+ // Simulate SELECT c, a ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("b"),
+ ScanTestUtils.parsers());
+ assertEquals(1, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(1, columns.size());
+ final VectorSource nullBuilder = rootTuple.nullBuilder();
+
+ assertEquals("b", columns.get(0).name());
+ assertEquals(0, columns.get(0).sourceIndex());
+ assertSame(nullBuilder, columns.get(0).source());
+ }
+
+ /**
+ * Test the obscure case that the data source contains a map, but we
+ * project only one of the members of the map. The output should be a
+ * map that contains only the members we request.
+ */
+
+ @Test
+ public void testOmittedMap() {
+
+ // Simulate SELECT a, b.c.d ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a", "b.c.d"),
+ ScanTestUtils.parsers());
+ assertEquals(2, scanProj.columns().size());
+ {
+ assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType());
+ final UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1));
+ assertTrue(bCol.element().isTuple());
+ }
+
+ // Simulate a data source, with early schema, of (a)
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(2, columns.size());
+
+ // Should have resolved a to a table column, b to a missing map.
+
+ // A is projected
+
+ final ResolvedColumn aCol = columns.get(0);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+
+ // B is not projected, is implicitly a map
+
+ final ResolvedColumn bCol = columns.get(1);
+ assertEquals("b", bCol.name());
+ assertEquals(ResolvedMapColumn.ID, bCol.nodeType());
+
+ final ResolvedMapColumn bMap = (ResolvedMapColumn) bCol;
+ final ResolvedTuple bMembers = bMap.members();
+ assertNotNull(bMembers);
+ assertEquals(1, bMembers.columns().size());
+
+ // C is a map within b
+
+ final ResolvedColumn cCol = bMembers.columns().get(0);
+ assertEquals(ResolvedMapColumn.ID, cCol.nodeType());
+
+ final ResolvedMapColumn cMap = (ResolvedMapColumn) cCol;
+ final ResolvedTuple cMembers = cMap.members();
+ assertNotNull(cMembers);
+ assertEquals(1, cMembers.columns().size());
+
+ // D is an unknown column type (not a map)
+
+ final ResolvedColumn dCol = cMembers.columns().get(0);
+ assertEquals(ResolvedNullColumn.ID, dCol.nodeType());
+ }
+
+ /**
+ * Test of a map with missing columns.
+ * table of (a{b, c}), project a.c, a.d, a.e.f
+ */
+
+ @Test
+ public void testOmittedMapMembers() {
+
+ // Simulate SELECT a.c, a.d, a.e.f ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"),
+ ScanTestUtils.parsers());
+ assertEquals(3, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (x, y, a{b, c})
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("x", MinorType.VARCHAR)
+ .add("y", MinorType.INT)
+ .addMap("a")
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .resumeSchema()
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(3, columns.size());
+
+ // Should have resolved a.b to a map column,
+ // a.d to a missing nested map, and a.e.f to a missing
+ // nested map member
+
+ // X is projected
+
+ final ResolvedColumn xCol = columns.get(0);
+ assertEquals("x", xCol.name());
+ assertEquals(ResolvedTableColumn.ID, xCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source());
+ assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex());
+
+ // Y is projected
+
+ final ResolvedColumn yCol = columns.get(2);
+ assertEquals("y", yCol.name());
+ assertEquals(ResolvedTableColumn.ID, yCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source());
+ assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex());
+
+ // A is projected
+
+ final ResolvedColumn aCol = columns.get(1);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedMapColumn.ID, aCol.nodeType());
+
+ final ResolvedMapColumn aMap = (ResolvedMapColumn) aCol;
+ final ResolvedTuple aMembers = aMap.members();
+ assertFalse(aMembers.isSimpleProjection());
+ assertNotNull(aMembers);
+ assertEquals(3, aMembers.columns().size());
+
+ // a.c is projected
+
+ final ResolvedColumn acCol = aMembers.columns().get(0);
+ assertEquals("c", acCol.name());
+ assertEquals(ResolvedTableColumn.ID, acCol.nodeType());
+ assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex());
+
+ // a.d is not in the table, is null
+
+ final ResolvedColumn adCol = aMembers.columns().get(1);
+ assertEquals("d", adCol.name());
+ assertEquals(ResolvedNullColumn.ID, adCol.nodeType());
+
+ // a.e is not in the table, is implicitly a map
+
+ final ResolvedColumn aeCol = aMembers.columns().get(2);
+ assertEquals("e", aeCol.name());
+ assertEquals(ResolvedMapColumn.ID, aeCol.nodeType());
+
+ final ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol;
+ final ResolvedTuple aeMembers = aeMap.members();
+ assertFalse(aeMembers.isSimpleProjection());
+ assertNotNull(aeMembers);
+ assertEquals(1, aeMembers.columns().size());
+
+ // a.d.f is a null column
+
+ final ResolvedColumn aefCol = aeMembers.columns().get(0);
+ assertEquals("f", aefCol.name());
+ assertEquals(ResolvedNullColumn.ID, aefCol.nodeType());
+ }
+
+ /**
+ * Simple map project. This is an internal case in which the
+ * query asks for a set of columns inside a map, and the table
+ * loader produces exactly that set. No special projection is
+ * needed, the map is projected as a whole.
+ */
+
+ @Test
+ public void testSimpleMapProject() {
+
+ // Simulate SELECT a.b, a.c ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a.b", "a.c"),
+ ScanTestUtils.parsers());
+ assertEquals(1, scanProj.columns().size());
+
+ // Simulate a data source, with early schema, of (a{b, c})
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .addMap("a")
+ .add("b", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .resumeSchema()
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(1, columns.size());
+
+ // Should have resolved a.b to a map column,
+ // a.d to a missing nested map, and a.e.f to a missing
+ // nested map member
+
+ // a is projected as a vector, not as a structured map
+
+ final ResolvedColumn aCol = columns.get(0);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
+ assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
+ }
+
+ /**
+ * Project of a non-map as a map
+ */
+
+ @Test
+ public void testMapMismatch() {
+
+ // Simulate SELECT a.b ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a.b"),
+ ScanTestUtils.parsers());
+
+ // Simulate a data source, with early schema, of (a)
+ // where a is not a map.
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ try {
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final UserException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Test project of an array. At the scan level, we just verify
+ * that the requested column is, indeed, an array.
+ */
+
+ @Test
+ public void testArrayProject() {
+
+ // Simulate SELECT a[0] ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a[0]"),
+ ScanTestUtils.parsers());
+
+ // Simulate a data source, with early schema, of (a)
+ // where a is not an array.
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .addArray("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals(1, columns.size());
+
+ final ResolvedColumn aCol = columns.get(0);
+ assertEquals("a", aCol.name());
+ assertEquals(ResolvedTableColumn.ID, aCol.nodeType());
+ assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source());
+ assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex());
+ }
+
+ /**
+ * Project of a non-array as an array
+ */
+
+ @Test
+ public void testArrayMismatch() {
+
+ // Simulate SELECT a[0] ...
+
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectList("a[0]"),
+ ScanTestUtils.parsers());
+
+ // Simulate a data source, with early schema, of (a)
+ // where a is not an array.
+
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.VARCHAR)
+ .buildSchema();
+
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ try {
+ new ExplicitSchemaProjection(
+ scanProj, tableSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final UserException e) {
+ // Expected
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
new file mode 100644
index 000000000..977cd216b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.SchemaTracker;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother;
+import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException;
+import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection;
+import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Tests schema smoothing at the schema projection level.
+ * This level handles reusing prior types when filling null
+ * values. But, because no actual vectors are involved, it
+ * does not handle the schema chosen for a table ahead of
+ * time, only the schema as it is merged with prior schema to
+ * detect missing columns.
+ * <p>
+ * Focuses on the <tt>SmoothingProjection</tt> class itself.
+ * <p>
+ * Note that, at present, schema smoothing does not work for entire
+ * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt>
+ * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does
+ * not currently know how to recreate the map. The same is true of
+ * lists and unions. Handling such cases is complex and is probably
+ * better handled via a system that allows the user to specify their
+ * intent by providing a schema to apply to the two files.
+ * <p>
+ * Note that schema smoothing itself is an experimental work-around
+ * to a fundamental limitation in Drill:
+ * <ul>
+ * <li>Drill cannot predict the future: each file (or batch)
+ * may have a different schema.</ul>
+ * <li>Drill does not know about these differences until they
+ * occur.</li>
+ * <li>The scan operator is obligated to return the same schema
+ * (and same vectors) from one file to the next, else a "hard"
+ * schema change is sent down stream.</li>
+ * </ul>
+ *
+ * The problem is actually intractable. The schema smoother handles the
+ * cases that can be handled, such as required --> nullable, a column
+ * disappearing, etc. This whole mechanism should be scrapped if/when
+ * Drill can work with schemas. Or, keep this to handle, as best we can,
+ * odd schemas, but insist on a schema to resolve issues that this
+ * mechanism cannot handle (and that, indeed, no algorithm could handle
+ * because such an algorithm would require time-travel: looking into
+ * the future to know what data will be scanned.)
+ */
+
+public class TestSchemaSmoothing extends SubOperatorTest {
+
+ /**
+ * Low-level test of the smoothing projection, including the exceptions
+ * it throws when things are not going its way.
+ */
+
+ @Test
+ public void testSmoothingProjection() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ // Table 1: (a: nullable bigint, b)
+
+ final TupleMetadata schema1 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ ResolvedRow priorSchema;
+ {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new WildcardSchemaProjection(
+ scanProj, schema1, rootTuple,
+ ScanTestUtils.resolvers());
+ priorSchema = rootTuple;
+ }
+
+ // Table 2: (a: nullable bigint, c), column omitted, original schema preserved
+
+ final TupleMetadata schema2 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema2, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ priorSchema = rootTuple;
+ } catch (final IncompatibleSchemaException e) {
+ fail();
+ }
+
+ // Table 3: (a, c, d), column added, must replan schema
+
+ final TupleMetadata schema3 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .add("d", MinorType.INT)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema3, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final IncompatibleSchemaException e) {
+ // Expected
+ }
+
+ // Table 4: (a: double), change type must replan schema
+
+ final TupleMetadata schema4 = new SchemaBuilder()
+ .addNullable("a", MinorType.FLOAT8)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema4, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final IncompatibleSchemaException e) {
+ // Expected
+ }
+
+ // Table 5: Drop a non-nullable column, must replan
+
+ final TupleMetadata schema6 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ try {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ new SmoothingProjection(
+ scanProj, schema6, priorSchema, rootTuple,
+ ScanTestUtils.resolvers());
+ fail();
+ } catch (final IncompatibleSchemaException e) {
+ // Expected
+ }
+ }
+
+ /**
+ * Case in which the table schema is a superset of the prior
+ * schema. Discard prior schema. Turn off auto expansion of
+ * metadata for a simpler test.
+ */
+
+ @Test
+ public void testSmaller() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ smoother.resolve(priorSchema, rootTuple);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ }
+ {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ smoother.resolve(tableSchema, rootTuple);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ /**
+ * Case in which the table schema and prior are disjoint
+ * sets. Discard the prior schema.
+ */
+
+ @Test
+ public void testDisjoint() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) {
+ final NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ final ResolvedRow rootTuple = new ResolvedRow(builder);
+ smoother.resolve(schema, rootTuple);
+ return rootTuple;
+ }
+
+ /**
+ * Column names match, but types differ. Discard the prior schema.
+ */
+
+ @Test
+ public void testDifferentTypes() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ /**
+ * The prior and table schemas are identical. Preserve the prior
+ * schema (though, the output is no different than if we discarded
+ * the prior schema...)
+ */
+
+ @Test
+ public void testSameSchemas() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple);
+ assertTrue(actualSchema.isEquivalent(tableSchema));
+ assertTrue(actualSchema.isEquivalent(priorSchema));
+ }
+ }
+
+ /**
+ * The prior and table schemas are identical, but the cases of names differ.
+ * Preserve the case of the first schema.
+ */
+
+ @Test
+ public void testDifferentCase() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("A", MinorType.INT)
+ .add("B", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ final List<ResolvedColumn> columns = rootTuple.columns();
+ assertEquals("a", columns.get(0).name());
+ }
+ }
+
+ /**
+ * Can't preserve the prior schema if it had required columns
+ * where the new schema has no columns.
+ */
+
+ @Test
+ public void testRequired() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(2, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema));
+ }
+ }
+
+ /**
+ * Preserve the prior schema if table is a subset and missing columns
+ * are nullable or repeated.
+ */
+
+ @Test
+ public void testMissingNullableColumns() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .addNullable("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addArray("c", MinorType.BIGINT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ }
+ }
+
+ /**
+ * Preserve the prior schema if table is a subset. Map the table
+ * columns to the output using the prior schema ordering.
+ */
+
+ @Test
+ public void testReordering() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ final TupleMetadata priorSchema = new SchemaBuilder()
+ .addNullable("a", MinorType.INT)
+ .add("b", MinorType.VARCHAR)
+ .addArray("c", MinorType.BIGINT)
+ .buildSchema();
+ final TupleMetadata tableSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addNullable("a", MinorType.INT)
+ .buildSchema();
+
+ {
+ doResolve(smoother, priorSchema);
+ }
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, tableSchema);
+ assertEquals(1, smoother.schemaVersion());
+ assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema));
+ }
+ }
+ /**
+ * Integrated test across multiple schemas at the batch level.
+ */
+
+ @Test
+ public void testSmoothableSchemaBatches() {
+ final ScanLevelProjection scanProj = new ScanLevelProjection(
+ RowSetTestUtils.projectAll(),
+ ScanTestUtils.parsers());
+
+ final SchemaSmoother smoother = new SchemaSmoother(scanProj,
+ ScanTestUtils.resolvers());
+
+ // Table 1: (a: bigint, b)
+
+ final TupleMetadata schema1 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema1);
+
+ // Just use the original schema.
+
+ assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(1, smoother.schemaVersion());
+ }
+
+ // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved
+
+ final TupleMetadata schema2 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .add("c", MinorType.FLOAT8)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema2);
+ assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(1, smoother.schemaVersion());
+ }
+
+ // Table 3: (a, c, d), column added, must replan schema
+
+ final TupleMetadata schema3 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .add("c", MinorType.FLOAT8)
+ .add("d", MinorType.INT)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema3);
+ assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(2, smoother.schemaVersion());
+ }
+
+ // Table 4: Drop a non-nullable column, must replan
+
+ final TupleMetadata schema4 = new SchemaBuilder()
+ .addNullable("a", MinorType.BIGINT)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema4);
+ assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(3, smoother.schemaVersion());
+ }
+
+ // Table 5: (a: double), change type must replan schema
+
+ final TupleMetadata schema5 = new SchemaBuilder()
+ .addNullable("a", MinorType.FLOAT8)
+ .addNullable("b", MinorType.VARCHAR)
+ .buildSchema();
+ {
+ final ResolvedRow rootTuple = doResolve(smoother, schema5);
+ assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple)));
+ assertEquals(4, smoother.schemaVersion());
+ }
+ }
+
+ /**
+ * A SELECT * query uses the schema of the table as the output schema.
+ * This is trivial when the scanner has one table. But, if two or more
+ * tables occur, then things get interesting. The first table sets the
+ * schema. The second table then has:
+ * <ul>
+ * <li>The same schema, trivial case.</li>
+ * <li>A subset of the first table. The type of the "missing" column
+ * from the first table is used for a null column in the second table.</li>
+ * <li>A superset or disjoint set of the first schema. This triggers a hard schema
+ * change.</li>
+ * </ul>
+ * <p>
+ * It is an open question whether previous columns should be preserved on
+ * a hard reset. For now, the code implements, and this test verifies, that a
+ * hard reset clears the "memory" of prior schemas.
+ */
+
+ @Test
+ public void testWildcardSmoothing() {
+ final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator());
+ projector.enableSchemaSmoothing(true);
+ projector.build(RowSetTestUtils.projectAll());
+
+ final TupleMetadata firstSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .addNullable("c", MinorType.BIGINT)
+ .buildSchema();
+ final TupleMetadata subsetSchema = new SchemaBuilder()
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .add("a", MinorType.INT)
+ .buildSchema();
+ final TupleMetadata disjointSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("b", MinorType.VARCHAR, 10)
+ .add("d", MinorType.VARCHAR)
+ .buildSchema();
+
+ final SchemaTracker tracker = new SchemaTracker();
+ int schemaVersion;
+ {
+ // First table, establishes the baseline
+ // ... FROM table 1
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow(10, "fred", 110L)
+ .addRow(20, "wilma", 110L);
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ schemaVersion = tracker.schemaVersion();
+
+ final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+ .addRow(10, "fred", 110L)
+ .addRow(20, "wilma", 110L)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+ {
+ // Second table, same schema, the trivial case
+ // ... FROM table 2
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(firstSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow(70, "pebbles", 770L)
+ .addRow(80, "hoppy", 880L);
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ assertEquals(schemaVersion, tracker.schemaVersion());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+ .addRow(70, "pebbles", 770L)
+ .addRow(80, "hoppy", 880L)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+ {
+ // Third table: subset schema of first two
+ // ... FROM table 3
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(subsetSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow("bambam", 30)
+ .addRow("betty", 40);
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ assertEquals(schemaVersion, tracker.schemaVersion());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(firstSchema)
+ .addRow(30, "bambam", null)
+ .addRow(40, "betty", null)
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+ {
+ // Fourth table: disjoint schema, cases a schema reset
+ // ... FROM table 4
+
+ final ReaderSchemaOrchestrator reader = projector.startReader();
+ final ResultSetLoader loader = reader.makeTableLoader(disjointSchema);
+
+ reader.startBatch();
+ loader.writer()
+ .addRow(50, "dino", "supporting")
+ .addRow(60, "barney", "main");
+ reader.endBatch();
+
+ tracker.trackSchema(projector.output());
+ assertNotEquals(schemaVersion, tracker.schemaVersion());
+
+ final SingleRowSet expected = fixture.rowSetBuilder(disjointSchema)
+ .addRow(50, "dino", "supporting")
+ .addRow(60, "barney", "main")
+ .build();
+ new RowSetComparison(expected)
+ .verifyAndClearAll(fixture.wrap(projector.output()));
+ }
+
+ projector.close();
+ }
+
+ // TODO: Test schema smoothing with repeated
+ // TODO: Test hard schema change
+ // TODO: Typed null column tests (resurrect)
+ // TODO: Test maps and arrays of maps
+}