diff options
author | Paul Rogers <progers@cloudera.com> | 2018-10-11 22:47:49 -0700 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-12-10 14:03:05 +0200 |
commit | 75b9a788df6d6ea5e73cfedff9e2b47acc27b684 (patch) | |
tree | 68f5dac1edda202c95dcbca6c2c4d109ddb16b3d /exec/java-exec/src/test/java/org/apache | |
parent | 31ba50fe332df04f38d16906d0be2a69790a342f (diff) |
DRILL-6791: Scan projection framework
The "schema projection" mechanism:
* Handles none (SELECT COUNT\(*)), some (SELECT a, b, x) and all (SELECT *) projection.
* Handles null columns (for projection a column "x" that does not exist in the base table.)
* Handles constant columns as used for file metadata (AKA "implicit" columns).
* Handle schema persistence: the need to reuse the same vectors across different scanners
* Provides a framework for consuming externally-supplied metadata
* Since we don't yet have a way to provide "real" metadata, obtains metadata hints from
previous batches and from the projection list (a.b implies that "a" is a map, c[0]
implies that "c" is an array, etc.)
* Handles merging the set of data source columns and null columns to create the final output batch.
* Running tests found a failure due to an uninialized "bits" vector. Added code to explicitly fill
the bits vectors with zeros in the "result set loader."
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache')
7 files changed, 2496 insertions, 0 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java new file mode 100644 index 000000000..c69357a71 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/ScanTestUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan; + +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser; +import org.apache.drill.exec.physical.impl.scan.project.SchemaLevelProjection.SchemaProjectionResolver; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.exec.record.metadata.TupleSchema; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; + +public class ScanTestUtils { + + /** + * Type-safe way to define a list of parsers. + * @param parsers as a varArgs list convenient for testing + * @return parsers as a Java List for input to the scan + * projection framework + */ + + public static List<ScanProjectionParser> parsers(ScanProjectionParser... parsers) { + return ImmutableList.copyOf(parsers); + } + + public static List<SchemaProjectionResolver> resolvers(SchemaProjectionResolver... resolvers) { + return ImmutableList.copyOf(resolvers); + } + + public static TupleMetadata schema(ResolvedTuple output) { + final TupleMetadata schema = new TupleSchema(); + for (final ResolvedColumn col : output.columns()) { + MaterializedField field = col.schema(); + if (field.getType() == null) { + + // Convert from internal format of null columns (unset type) + // to a usable form (explicit minor type of NULL.) + + field = MaterializedField.create(field.getName(), + Types.optional(MinorType.NULL)); + } + schema.add(field); + } + return schema; + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java new file mode 100644 index 000000000..c524577d9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestConstantColumnLoader.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.project.ConstantColumnLoader.ConstantColumnSpec; +import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * Drill allows file metadata columns (also called "implicit" columns.) + * These are columns that contain a long repeated sequences of the same + * values. The ConstantColumnLoader builds and populates these columns. + */ + +public class TestConstantColumnLoader extends SubOperatorTest { + + private static class DummyColumn implements ConstantColumnSpec { + + private final String name; + private final MaterializedField schema; + private final String value; + + public DummyColumn(String name, MajorType type, String value) { + this.name = name; + this.schema = MaterializedField.create(name, type); + this.value = value; + } + + @Override + public String name() { return name; } + + @Override + public MaterializedField schema() { return schema; } + + @Override + public String value() { return value; } + } + + /** + * Test the static column loader using one column of each type. + * The null column is of type int, but the associated value is of + * type string. This is a bit odd, but works out because we detect that + * the string value is null and call setNull on the writer, and avoid + * using the actual data. + */ + + @Test + public void testConstantColumnLoader() { + + final MajorType aType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.REQUIRED) + .build(); + final MajorType bType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + + final List<ConstantColumnSpec> defns = new ArrayList<>(); + defns.add( + new DummyColumn("a", aType, "a-value" )); + defns.add( + new DummyColumn("b", bType, "b-value" )); + + final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + final ConstantColumnLoader staticLoader = new ConstantColumnLoader(cache, defns); + + // Create a batch + + staticLoader.load(2); + + // Verify + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("a", aType) + .add("b", bType) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("a-value", "b-value") + .addRow("a-value", "b-value") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(staticLoader.load(2))); + staticLoader.close(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java new file mode 100644 index 000000000..4bf5a6ad9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestNullColumnLoader.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertSame; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnLoader; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; +import org.apache.drill.exec.physical.rowSet.impl.ResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.Test; + +/** + * Test the mechanism that handles all-null columns during projection. + * An all-null column is one projected in the query, but which does + * not actually exist in the underlying data source (or input + * operator.) + * <p> + * In anticipation of having type information, this mechanism + * can create the classic nullable Int null column, or one of + * any other type and mode. + */ + +public class TestNullColumnLoader extends SubOperatorTest { + + private ResolvedNullColumn makeNullCol(String name, MajorType nullType) { + + // For this test, we don't need the projection, so just + // set it to null. + + return new ResolvedNullColumn(name, nullType, null, 0); + } + + private ResolvedNullColumn makeNullCol(String name) { + return makeNullCol(name, null); + } + + /** + * Test the simplest case: default null type, nothing in the vector + * cache. Specify no column type, the special NULL type, or a + * predefined type. Output types should be set accordingly. + */ + + @Test + public void testBasics() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("unspecified", null)); + defns.add(makeNullCol("nullType", Types.optional(MinorType.NULL))); + defns.add(makeNullCol("specifiedOpt", Types.optional(MinorType.VARCHAR))); + defns.add(makeNullCol("specifiedReq", Types.required(MinorType.VARCHAR))); + defns.add(makeNullCol("specifiedArray", Types.repeated(MinorType.VARCHAR))); + + final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, null, false); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE) + .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE) + .addNullable("specifiedOpt", MinorType.VARCHAR) + .addNullable("specifiedReq", MinorType.VARCHAR) + .addArray("specifiedArray", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null, null, null, new String[] {}) + .addRow(null, null, null, null, new String[] {}) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Test the ability to use a type other than nullable INT for null + * columns. This occurs, for example, in the CSV reader where no + * column is ever INT (nullable or otherwise) and we want our null + * columns to be (non-nullable) VARCHAR. + */ + + @Test + public void testCustomNullType() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("unspecified", null)); + defns.add(makeNullCol("nullType", MajorType.newBuilder() + .setMinorType(MinorType.NULL) + .setMode(DataMode.OPTIONAL) + .build())); + + // Null required is an oxymoron, so is not tested. + // Null type array does not make sense, so is not tested. + + final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + final MajorType nullType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("unspecified", nullType) + .add("nullType", nullType) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null) + .addRow(null, null) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Drill requires "schema persistence": if a scan operator + * reads two files, F1 and F2, then the scan operator must + * provide the same vectors from both readers. Not just the + * same types, the same value vector instances (but, of course, + * populated with different data.) + * <p> + * Test the case in which the reader for F1 found columns + * (a, b, c) but, F2 found only (a, b), requiring that we + * fill in column c, filled with nulls, but of the same type that it + * was in file F1. We use a vector cache to pull off this trick. + * This test ensures that the null column mechanism looks in that + * vector cache when asked to create a nullable column. + */ + + @Test + public void testCachedTypesMapToNullable() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("req")); + defns.add(makeNullCol("opt")); + defns.add(makeNullCol("rep")); + defns.add(makeNullCol("unk")); + + // Populate the cache with a column of each mode. + + final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED)); + final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL)); + final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED)); + + // Use nullable Varchar for unknown null columns. + + final MajorType nullType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, false); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify vectors are reused + + assertSame(opt, output.getValueVector(1).getValueVector()); + assertSame(rep, output.getValueVector(2).getValueVector()); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .addNullable("req", MinorType.FLOAT8) + .addNullable("opt", MinorType.FLOAT8) + .addArray("rep", MinorType.FLOAT8) + .addNullable("unk", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null, new int[] { }, null) + .addRow(null, null, new int[] { }, null) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Suppose, in the previous test, that one of the columns that + * goes missing is a required column. The null-column mechanism can + * create the "null" column as a required column, then fill it with + * empty values (zero or "") -- if the scan operator feels doing so would + * be helpful. + */ + + @Test + public void testCachedTypesAllowRequired() { + + final List<ResolvedNullColumn> defns = new ArrayList<>(); + defns.add(makeNullCol("req")); + defns.add(makeNullCol("opt")); + defns.add(makeNullCol("rep")); + defns.add(makeNullCol("unk")); + + // Populate the cache with a column of each mode. + + final ResultVectorCacheImpl cache = new ResultVectorCacheImpl(fixture.allocator()); + cache.addOrGet(SchemaBuilder.columnSchema("req", MinorType.FLOAT8, DataMode.REQUIRED)); + final ValueVector opt = cache.addOrGet(SchemaBuilder.columnSchema("opt", MinorType.FLOAT8, DataMode.OPTIONAL)); + final ValueVector rep = cache.addOrGet(SchemaBuilder.columnSchema("rep", MinorType.FLOAT8, DataMode.REPEATED)); + + // Use nullable Varchar for unknown null columns. + + final MajorType nullType = MajorType.newBuilder() + .setMinorType(MinorType.VARCHAR) + .setMode(DataMode.OPTIONAL) + .build(); + final NullColumnLoader staticLoader = new NullColumnLoader(cache, defns, nullType, true); + + // Create a batch + + final VectorContainer output = staticLoader.load(2); + + // Verify vectors are reused + + assertSame(opt, output.getValueVector(1).getValueVector()); + assertSame(rep, output.getValueVector(2).getValueVector()); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("req", MinorType.FLOAT8) + .addNullable("opt", MinorType.FLOAT8) + .addArray("rep", MinorType.FLOAT8) + .addNullable("unk", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(0.0, null, new int[] { }, null) + .addRow(0.0, null, new int[] { }, null) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(output)); + staticLoader.close(); + } + + /** + * Test the shim class that adapts between the null column loader + * and the projection mechanism. The projection mechanism uses this + * to pull in the null columns which the null column loader has + * created. + */ + + @Test + public void testNullColumnBuilder() { + + final ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + + builder.add("unspecified"); + builder.add("nullType", Types.optional(MinorType.NULL)); + builder.add("specifiedOpt", Types.optional(MinorType.VARCHAR)); + builder.add("specifiedReq", Types.required(MinorType.VARCHAR)); + builder.add("specifiedArray", Types.repeated(MinorType.VARCHAR)); + builder.build(cache); + + // Create a batch + + builder.load(2); + + // Verify values and types + + final BatchSchema expectedSchema = new SchemaBuilder() + .add("unspecified", NullColumnLoader.DEFAULT_NULL_TYPE) + .add("nullType", NullColumnLoader.DEFAULT_NULL_TYPE) + .addNullable("specifiedOpt", MinorType.VARCHAR) + .addNullable("specifiedReq", MinorType.VARCHAR) + .addArray("specifiedArray", MinorType.VARCHAR) + .build(); + final SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(null, null, null, null, new String[] {}) + .addRow(null, null, null, null, new String[] {}) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(builder.output())); + builder.close(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java new file mode 100644 index 000000000..cb0365ca9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.VectorSource; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.DrillBuf; + +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; +import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray; + + +/** + * Test the row batch merger by merging two batches. Tests both the + * "direct" and "exchange" cases. Direct means that the output container + * contains the source vector directly: they are the same vectors. + * Exchange means we have two vectors, but we swap the underlying + * Drillbufs to effectively shift data from source to destination + * vector. + */ + +public class TestRowBatchMerger extends SubOperatorTest { + + public static class RowSetSource implements VectorSource { + + private SingleRowSet rowSet; + + public RowSetSource(SingleRowSet rowSet) { + this.rowSet = rowSet; + } + + public RowSet rowSet() { return rowSet; } + + public void clear() { + rowSet.clear(); + } + + @Override + public ValueVector vector(int index) { + return rowSet.container().getValueVector(index).getValueVector(); + } + } + + public static final int SLAB_SIZE = 16 * 1024 * 1024; + + @BeforeClass + public static void setup() { + // Party on 10 16MB blocks of memory to detect vector issues + DrillBuf bufs[] = new DrillBuf[10]; + for (int i = 0; i < bufs.length; i++) { + bufs[i] = fixture.allocator().buffer(SLAB_SIZE); + for (int j = 0; j < SLAB_SIZE / 4; j++) { + bufs[i].setInt(j * 4, 0xDEADBEEF); + } + } + for (int i = 0; i < bufs.length; i++) { + bufs[i].release(); + } + } + + private RowSetSource makeFirst() { + BatchSchema firstSchema = new SchemaBuilder() + .add("d", MinorType.VARCHAR) + .add("a", MinorType.INT) + .build(); + return new RowSetSource( + fixture.rowSetBuilder(firstSchema) + .addRow("barney", 10) + .addRow("wilma", 20) + .build()); + } + + private RowSetSource makeSecond() { + BatchSchema secondSchema = new SchemaBuilder() + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .build(); + return new RowSetSource( + fixture.rowSetBuilder(secondSchema) + .addRow(1, "foo.csv") + .addRow(2, "foo.csv") + .build()); + } + + public static class TestProjection extends ResolvedColumn { + + public TestProjection(VectorSource source, int sourceIndex) { + super(source, sourceIndex); + } + + @Override + public String name() { return null; } + + @Override + public int nodeType() { return -1; } + + @Override + public MaterializedField schema() { return null; } + } + + @Test + public void testSimpleFlat() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create the second batch + + RowSetSource second = makeSecond(); + + ResolvedRow resolvedTuple = new ResolvedRow(null); + resolvedTuple.add(new TestProjection(first, 1)); + resolvedTuple.add(new TestProjection(second, 0)); + resolvedTuple.add(new TestProjection(second, 1)); + resolvedTuple.add(new TestProjection(first, 0)); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(null, output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, 1, "foo.csv", "barney") + .addRow(20, 2, "foo.csv", "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + @Test + public void testImplicitFlat() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create the second batch + + RowSetSource second = makeSecond(); + + ResolvedRow resolvedTuple = new ResolvedRow(null); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + resolvedTuple.add(new TestProjection(second, 0)); + resolvedTuple.add(new TestProjection(second, 1)); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, 1, "foo.csv", "barney") + .addRow(20, 2, "foo.csv", "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + @Test + public void testFlatWithNulls() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create null columns + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + + ResolvedRow resolvedTuple = new ResolvedRow(builder); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + resolvedTuple.add(resolvedTuple.nullBuilder().add("null1")); + resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + builder.build(cache); + builder.load(first.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("null1", MinorType.INT) + .addNullable("null2", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, null, null, "barney") + .addRow(20, null, null, "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + builder.close(); + } + + /** + * Test the ability to create maps from whole cloth if requested in + * the projection list, and the map is not available from the data + * source. + */ + + @Test + public void testNullMaps() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create null columns + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + + ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1"); + ResolvedTuple nullMap = nullMapCol.members(); + nullMap.add(nullMap.nullBuilder().add("null1")); + nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); + + ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2"); + ResolvedTuple nullMap2 = nullMapCol2.members(); + nullMap2.add(nullMap2.nullBuilder().add("null3")); + nullMap.add(nullMapCol2); + + resolvedTuple.add(nullMapCol); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(first.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + resolvedTuple.setRowCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("map1") + .addNullable("null1", MinorType.INT) + .addNullable("null2", MinorType.VARCHAR) + .addMap("map2") + .addNullable("null3", MinorType.INT) + .resumeMap() + .resumeSchema() + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, mapValue(null, null, singleMap(null)), "barney") + .addRow(20, mapValue(null, null, singleMap(null)), "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + resolvedTuple.close(); + } + + /** + * Test that the merger mechanism can rewrite a map to include + * projected null columns. + */ + + @Test + public void testMapRevision() { + + // Create the first batch + + BatchSchema inputSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMap("a") + .add("c", MinorType.INT) + .resumeSchema() + .build(); + RowSetSource input = new RowSetSource( + fixture.rowSetBuilder(inputSchema) + .addRow("barney", singleMap(10)) + .addRow("wilma", singleMap(20)) + .build()); + + // Create mappings + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, + inputSchema.getColumn(1), 1); + resolvedTuple.add(mapCol); + ResolvedTuple map = mapCol.members(); + map.add(new TestProjection(map, 0)); + map.add(map.nullBuilder().add("null1")); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(input.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(input.rowSet().container(), output); + output.setRecordCount(input.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMap("a") + .add("c", MinorType.INT) + .addNullable("null1", MinorType.INT) + .resumeSchema() + .build(); + RowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("barney", mapValue(10, null)) + .addRow("wilma", mapValue(20, null)) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + /** + * Test that the merger mechanism can rewrite a map array to include + * projected null columns. + */ + + @Test + public void testMapArrayRevision() { + + // Create the first batch + + BatchSchema inputSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMapArray("a") + .add("c", MinorType.INT) + .resumeSchema() + .build(); + RowSetSource input = new RowSetSource( + fixture.rowSetBuilder(inputSchema) + .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12))) + .addRow("wilma", mapArray(singleMap(20), singleMap(21))) + .build()); + + // Create mappings + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, + inputSchema.getColumn(1), 1); + resolvedTuple.add(mapCol); + ResolvedTuple map = mapCol.members(); + map.add(new TestProjection(map, 0)); + map.add(map.nullBuilder().add("null1")); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(input.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(input.rowSet().container(), output); + output.setRecordCount(input.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMapArray("a") + .add("c", MinorType.INT) + .addNullable("null1", MinorType.INT) + .resumeSchema() + .build(); + RowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("barney", mapArray( + mapValue(10, null), mapValue(11, null), mapValue(12, null))) + .addRow("wilma", mapArray( + mapValue(20, null), mapValue(21, null))) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java new file mode 100644 index 000000000..e07ad9a62 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestScanLevelProjection.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn; +import org.apache.drill.exec.record.metadata.ProjectionType; +import org.apache.drill.test.SubOperatorTest; +import org.junit.Test; + +/** + * Test the level of projection done at the level of the scan as a whole; + * before knowledge of table "implicit" columns or the specific table schema. + */ + +public class TestScanLevelProjection extends SubOperatorTest { + + /** + * Basic test: select a set of columns (a, b, c) when the + * data source has an early schema of (a, c, d). (a, c) are + * projected, (d) is null. + */ + + @Test + public void testBasics() { + + // Simulate SELECT a, b, c ... + // Build the projection plan and verify + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a", "b", "c"), + ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + + assertEquals(3, scanProj.requestedCols().size()); + assertEquals("a", scanProj.requestedCols().get(0).rootName()); + assertEquals("b", scanProj.requestedCols().get(1).rootName()); + assertEquals("c", scanProj.requestedCols().get(2).rootName()); + + assertEquals(3, scanProj.columns().size()); + assertEquals("a", scanProj.columns().get(0).name()); + assertEquals("b", scanProj.columns().get(1).name()); + assertEquals("c", scanProj.columns().get(2).name()); + + // Verify column type + + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + } + + /** + * Map projection occurs when a query contains project-list items with + * a dot, such as "a.b". We may not know the type of "b", but have + * just learned that "a" must be a map. + */ + + @Test + public void testMap() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a.x", "b.x", "a.y", "b.y", "c"), + ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + + assertEquals(3, scanProj.columns().size()); + assertEquals("a", scanProj.columns().get(0).name()); + assertEquals("b", scanProj.columns().get(1).name()); + assertEquals("c", scanProj.columns().get(2).name()); + + // Verify column type + + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + + // Map structure + + final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element(); + assertTrue(a.isTuple()); + assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("x")); + assertEquals(ProjectionType.UNSPECIFIED, a.mapProjection().projectionType("y")); + assertEquals(ProjectionType.UNPROJECTED, a.mapProjection().projectionType("z")); + + final RequestedColumn c = ((UnresolvedColumn) scanProj.columns().get(2)).element(); + assertTrue(c.isSimple()); + } + + /** + * Similar to maps, if the project list contains "a[1]" then we've learned that + * a is an array, but we don't know what type. + */ + + @Test + public void testArray() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a[1]", "a[3]"), + ScanTestUtils.parsers()); + assertFalse(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + + assertEquals(1, scanProj.columns().size()); + assertEquals("a", scanProj.columns().get(0).name()); + + // Verify column type + + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(0).nodeType()); + + // Map structure + + final RequestedColumn a = ((UnresolvedColumn) scanProj.columns().get(0)).element(); + assertTrue(a.isArray()); + assertFalse(a.hasIndex(0)); + assertTrue(a.hasIndex(1)); + assertFalse(a.hasIndex(2)); + assertTrue(a.hasIndex(3)); + } + + /** + * Simulate a SELECT * query by passing "*" as a column name. + */ + + @Test + public void testWildcard() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + assertTrue(scanProj.projectAll()); + assertFalse(scanProj.projectNone()); + assertEquals(1, scanProj.requestedCols().size()); + assertTrue(scanProj.requestedCols().get(0).isDynamicStar()); + + assertEquals(1, scanProj.columns().size()); + assertEquals(SchemaPath.DYNAMIC_STAR, scanProj.columns().get(0).name()); + + // Verify bindings + + assertEquals(scanProj.columns().get(0).name(), scanProj.requestedCols().get(0).rootName()); + + // Verify column type + + assertEquals(UnresolvedColumn.WILDCARD, scanProj.columns().get(0).nodeType()); + } + + /** + * Test an empty projection which occurs in a + * SELECT COUNT(*) query. + */ + + @Test + public void testEmptyProjection() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList(), + ScanTestUtils.parsers()); + + assertFalse(scanProj.projectAll()); + assertTrue(scanProj.projectNone()); + assertEquals(0, scanProj.requestedCols().size()); + } + + /** + * Can't include both a wildcard and a column name. + */ + + @Test + public void testErrorWildcardAndColumns() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, "a"), + ScanTestUtils.parsers()); + fail(); + } catch (final IllegalArgumentException e) { + // Expected + } + } + + /** + * Can't include both a column name and a wildcard. + */ + + @Test + public void testErrorColumnAndWildcard() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList("a", SchemaPath.DYNAMIC_STAR), + ScanTestUtils.parsers()); + fail(); + } catch (final IllegalArgumentException e) { + // Expected + } + } + + /** + * Can't include a wildcard twice. + * <p> + * Note: Drill actually allows this, but the work should be done + * in the project operator; scan should see at most one wildcard. + */ + + @Test + public void testErrorTwoWildcards() { + try { + new ScanLevelProjection( + RowSetTestUtils.projectList(SchemaPath.DYNAMIC_STAR, SchemaPath.DYNAMIC_STAR), + ScanTestUtils.parsers()); + fail(); + } catch (final UserException e) { + // Expected + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java new file mode 100644 index 000000000..839e2caa4 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaLevelProjection.java @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.ExplicitSchemaProjection; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedNullColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTableColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.UnresolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.VectorSource; +import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * "Schema level projection" describes one side of the projection + * mechanism. When we project, we have the set of column the user + * wants "the schema level" and the set of columns on offer from + * the data source "the scan level." The projection mechanism + * combines these to map out the actual projection. + */ + +public class TestSchemaLevelProjection extends SubOperatorTest { + + /** + * Test wildcard projection: take all columns on offer from + * the data source, in the order that the data source specifies. + */ + + @Test + public void testWildcard() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + assertEquals(1, scanProj.columns().size()); + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .addNullable("c", MinorType.INT) + .addArray("d", MinorType.FLOAT8) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new WildcardSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + + assertEquals("a", columns.get(0).name()); + assertEquals(0, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + assertEquals("c", columns.get(1).name()); + assertEquals(1, columns.get(1).sourceIndex()); + assertSame(rootTuple, columns.get(1).source()); + assertEquals("d", columns.get(2).name()); + assertEquals(2, columns.get(2).sourceIndex()); + assertSame(rootTuple, columns.get(2).source()); + } + + /** + * Test SELECT list with columns defined in a order and with + * name case different than the early-schema table. + */ + + @Test + public void testFullList() { + + // Simulate SELECT c, b, a ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("c", "b", "a"), + ScanTestUtils.parsers()); + assertEquals(3, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a, b, c) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("B", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + + assertEquals("c", columns.get(0).name()); + assertEquals(2, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + + assertEquals("b", columns.get(1).name()); + assertEquals(1, columns.get(1).sourceIndex()); + assertSame(rootTuple, columns.get(1).source()); + + assertEquals("a", columns.get(2).name()); + assertEquals(0, columns.get(2).sourceIndex()); + assertSame(rootTuple, columns.get(2).source()); + } + + /** + * Test SELECT list with columns missing from the table schema. + */ + + @Test + public void testMissing() { + + // Simulate SELECT c, v, b, w ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("c", "v", "b", "w"), + ScanTestUtils.parsers()); + assertEquals(4, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a, b, c) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("B", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(4, columns.size()); + final VectorSource nullBuilder = rootTuple.nullBuilder(); + + assertEquals("c", columns.get(0).name()); + assertEquals(2, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + + assertEquals("v", columns.get(1).name()); + assertEquals(0, columns.get(1).sourceIndex()); + assertSame(nullBuilder, columns.get(1).source()); + + assertEquals("b", columns.get(2).name()); + assertEquals(1, columns.get(2).sourceIndex()); + assertSame(rootTuple, columns.get(2).source()); + + assertEquals("w", columns.get(3).name()); + assertEquals(1, columns.get(3).sourceIndex()); + assertSame(nullBuilder, columns.get(3).source()); + } + + /** + * Test an explicit projection (providing columns) in which the + * names in the project lists are a different case than the data + * source, the order of columns differs, and we ask for a + * subset of data source columns. + */ + @Test + public void testSubset() { + + // Simulate SELECT c, a ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("c", "a"), + ScanTestUtils.parsers()); + assertEquals(2, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a, b, c) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .add("B", MinorType.VARCHAR) + .add("C", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(2, columns.size()); + + assertEquals("c", columns.get(0).name()); + assertEquals(2, columns.get(0).sourceIndex()); + assertSame(rootTuple, columns.get(0).source()); + + assertEquals("a", columns.get(1).name()); + assertEquals(0, columns.get(1).sourceIndex()); + assertSame(rootTuple, columns.get(1).source()); + } + + /** + * Drill is unique in that we can select (a, b) from a data source + * that only offers (c, d). We get null columns as a result. + */ + + @Test + public void testDisjoint() { + + // Simulate SELECT c, a ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("b"), + ScanTestUtils.parsers()); + assertEquals(1, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(1, columns.size()); + final VectorSource nullBuilder = rootTuple.nullBuilder(); + + assertEquals("b", columns.get(0).name()); + assertEquals(0, columns.get(0).sourceIndex()); + assertSame(nullBuilder, columns.get(0).source()); + } + + /** + * Test the obscure case that the data source contains a map, but we + * project only one of the members of the map. The output should be a + * map that contains only the members we request. + */ + + @Test + public void testOmittedMap() { + + // Simulate SELECT a, b.c.d ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a", "b.c.d"), + ScanTestUtils.parsers()); + assertEquals(2, scanProj.columns().size()); + { + assertEquals(UnresolvedColumn.UNRESOLVED, scanProj.columns().get(1).nodeType()); + final UnresolvedColumn bCol = (UnresolvedColumn) (scanProj.columns().get(1)); + assertTrue(bCol.element().isTuple()); + } + + // Simulate a data source, with early schema, of (a) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(2, columns.size()); + + // Should have resolved a to a table column, b to a missing map. + + // A is projected + + final ResolvedColumn aCol = columns.get(0); + assertEquals("a", aCol.name()); + assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); + + // B is not projected, is implicitly a map + + final ResolvedColumn bCol = columns.get(1); + assertEquals("b", bCol.name()); + assertEquals(ResolvedMapColumn.ID, bCol.nodeType()); + + final ResolvedMapColumn bMap = (ResolvedMapColumn) bCol; + final ResolvedTuple bMembers = bMap.members(); + assertNotNull(bMembers); + assertEquals(1, bMembers.columns().size()); + + // C is a map within b + + final ResolvedColumn cCol = bMembers.columns().get(0); + assertEquals(ResolvedMapColumn.ID, cCol.nodeType()); + + final ResolvedMapColumn cMap = (ResolvedMapColumn) cCol; + final ResolvedTuple cMembers = cMap.members(); + assertNotNull(cMembers); + assertEquals(1, cMembers.columns().size()); + + // D is an unknown column type (not a map) + + final ResolvedColumn dCol = cMembers.columns().get(0); + assertEquals(ResolvedNullColumn.ID, dCol.nodeType()); + } + + /** + * Test of a map with missing columns. + * table of (a{b, c}), project a.c, a.d, a.e.f + */ + + @Test + public void testOmittedMapMembers() { + + // Simulate SELECT a.c, a.d, a.e.f ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("x", "a.c", "a.d", "a.e.f", "y"), + ScanTestUtils.parsers()); + assertEquals(3, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (x, y, a{b, c}) + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("x", MinorType.VARCHAR) + .add("y", MinorType.INT) + .addMap("a") + .add("b", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .resumeSchema() + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(3, columns.size()); + + // Should have resolved a.b to a map column, + // a.d to a missing nested map, and a.e.f to a missing + // nested map member + + // X is projected + + final ResolvedColumn xCol = columns.get(0); + assertEquals("x", xCol.name()); + assertEquals(ResolvedTableColumn.ID, xCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (xCol)).source()); + assertEquals(0, ((ResolvedTableColumn) (xCol)).sourceIndex()); + + // Y is projected + + final ResolvedColumn yCol = columns.get(2); + assertEquals("y", yCol.name()); + assertEquals(ResolvedTableColumn.ID, yCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (yCol)).source()); + assertEquals(1, ((ResolvedTableColumn) (yCol)).sourceIndex()); + + // A is projected + + final ResolvedColumn aCol = columns.get(1); + assertEquals("a", aCol.name()); + assertEquals(ResolvedMapColumn.ID, aCol.nodeType()); + + final ResolvedMapColumn aMap = (ResolvedMapColumn) aCol; + final ResolvedTuple aMembers = aMap.members(); + assertFalse(aMembers.isSimpleProjection()); + assertNotNull(aMembers); + assertEquals(3, aMembers.columns().size()); + + // a.c is projected + + final ResolvedColumn acCol = aMembers.columns().get(0); + assertEquals("c", acCol.name()); + assertEquals(ResolvedTableColumn.ID, acCol.nodeType()); + assertEquals(1, ((ResolvedTableColumn) (acCol)).sourceIndex()); + + // a.d is not in the table, is null + + final ResolvedColumn adCol = aMembers.columns().get(1); + assertEquals("d", adCol.name()); + assertEquals(ResolvedNullColumn.ID, adCol.nodeType()); + + // a.e is not in the table, is implicitly a map + + final ResolvedColumn aeCol = aMembers.columns().get(2); + assertEquals("e", aeCol.name()); + assertEquals(ResolvedMapColumn.ID, aeCol.nodeType()); + + final ResolvedMapColumn aeMap = (ResolvedMapColumn) aeCol; + final ResolvedTuple aeMembers = aeMap.members(); + assertFalse(aeMembers.isSimpleProjection()); + assertNotNull(aeMembers); + assertEquals(1, aeMembers.columns().size()); + + // a.d.f is a null column + + final ResolvedColumn aefCol = aeMembers.columns().get(0); + assertEquals("f", aefCol.name()); + assertEquals(ResolvedNullColumn.ID, aefCol.nodeType()); + } + + /** + * Simple map project. This is an internal case in which the + * query asks for a set of columns inside a map, and the table + * loader produces exactly that set. No special projection is + * needed, the map is projected as a whole. + */ + + @Test + public void testSimpleMapProject() { + + // Simulate SELECT a.b, a.c ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a.b", "a.c"), + ScanTestUtils.parsers()); + assertEquals(1, scanProj.columns().size()); + + // Simulate a data source, with early schema, of (a{b, c}) + + final TupleMetadata tableSchema = new SchemaBuilder() + .addMap("a") + .add("b", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .resumeSchema() + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(1, columns.size()); + + // Should have resolved a.b to a map column, + // a.d to a missing nested map, and a.e.f to a missing + // nested map member + + // a is projected as a vector, not as a structured map + + final ResolvedColumn aCol = columns.get(0); + assertEquals("a", aCol.name()); + assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source()); + assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex()); + } + + /** + * Project of a non-map as a map + */ + + @Test + public void testMapMismatch() { + + // Simulate SELECT a.b ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a.b"), + ScanTestUtils.parsers()); + + // Simulate a data source, with early schema, of (a) + // where a is not a map. + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + try { + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final UserException e) { + // Expected + } + } + + /** + * Test project of an array. At the scan level, we just verify + * that the requested column is, indeed, an array. + */ + + @Test + public void testArrayProject() { + + // Simulate SELECT a[0] ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a[0]"), + ScanTestUtils.parsers()); + + // Simulate a data source, with early schema, of (a) + // where a is not an array. + + final TupleMetadata tableSchema = new SchemaBuilder() + .addArray("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals(1, columns.size()); + + final ResolvedColumn aCol = columns.get(0); + assertEquals("a", aCol.name()); + assertEquals(ResolvedTableColumn.ID, aCol.nodeType()); + assertSame(rootTuple, ((ResolvedTableColumn) (aCol)).source()); + assertEquals(0, ((ResolvedTableColumn) (aCol)).sourceIndex()); + } + + /** + * Project of a non-array as an array + */ + + @Test + public void testArrayMismatch() { + + // Simulate SELECT a[0] ... + + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectList("a[0]"), + ScanTestUtils.parsers()); + + // Simulate a data source, with early schema, of (a) + // where a is not an array. + + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.VARCHAR) + .buildSchema(); + + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + try { + new ExplicitSchemaProjection( + scanProj, tableSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final UserException e) { + // Expected + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java new file mode 100644 index 000000000..977cd216b --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestSchemaSmoothing.java @@ -0,0 +1,691 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.List; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.physical.impl.protocol.SchemaTracker; +import org.apache.drill.exec.physical.impl.scan.ScanTestUtils; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection; +import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ReaderSchemaOrchestrator; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother; +import org.apache.drill.exec.physical.impl.scan.project.SchemaSmoother.IncompatibleSchemaException; +import org.apache.drill.exec.physical.impl.scan.project.SmoothingProjection; +import org.apache.drill.exec.physical.impl.scan.project.WildcardSchemaProjection; +import org.apache.drill.exec.physical.rowSet.ResultSetLoader; +import org.apache.drill.exec.physical.rowSet.impl.RowSetTestUtils; +import org.apache.drill.exec.record.metadata.TupleMetadata; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.junit.Test; + +/** + * Tests schema smoothing at the schema projection level. + * This level handles reusing prior types when filling null + * values. But, because no actual vectors are involved, it + * does not handle the schema chosen for a table ahead of + * time, only the schema as it is merged with prior schema to + * detect missing columns. + * <p> + * Focuses on the <tt>SmoothingProjection</tt> class itself. + * <p> + * Note that, at present, schema smoothing does not work for entire + * maps. That is, if file 1 has, say <tt>{a: {b: 10, c: "foo"}}</tt> + * and file 2 has, say, <tt>{a: null}</tt>, then schema smoothing does + * not currently know how to recreate the map. The same is true of + * lists and unions. Handling such cases is complex and is probably + * better handled via a system that allows the user to specify their + * intent by providing a schema to apply to the two files. + * <p> + * Note that schema smoothing itself is an experimental work-around + * to a fundamental limitation in Drill: + * <ul> + * <li>Drill cannot predict the future: each file (or batch) + * may have a different schema.</ul> + * <li>Drill does not know about these differences until they + * occur.</li> + * <li>The scan operator is obligated to return the same schema + * (and same vectors) from one file to the next, else a "hard" + * schema change is sent down stream.</li> + * </ul> + * + * The problem is actually intractable. The schema smoother handles the + * cases that can be handled, such as required --> nullable, a column + * disappearing, etc. This whole mechanism should be scrapped if/when + * Drill can work with schemas. Or, keep this to handle, as best we can, + * odd schemas, but insist on a schema to resolve issues that this + * mechanism cannot handle (and that, indeed, no algorithm could handle + * because such an algorithm would require time-travel: looking into + * the future to know what data will be scanned.) + */ + +public class TestSchemaSmoothing extends SubOperatorTest { + + /** + * Low-level test of the smoothing projection, including the exceptions + * it throws when things are not going its way. + */ + + @Test + public void testSmoothingProjection() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + // Table 1: (a: nullable bigint, b) + + final TupleMetadata schema1 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + ResolvedRow priorSchema; + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new WildcardSchemaProjection( + scanProj, schema1, rootTuple, + ScanTestUtils.resolvers()); + priorSchema = rootTuple; + } + + // Table 2: (a: nullable bigint, c), column omitted, original schema preserved + + final TupleMetadata schema2 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema2, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + priorSchema = rootTuple; + } catch (final IncompatibleSchemaException e) { + fail(); + } + + // Table 3: (a, c, d), column added, must replan schema + + final TupleMetadata schema3 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .add("d", MinorType.INT) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema3, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + + // Table 4: (a: double), change type must replan schema + + final TupleMetadata schema4 = new SchemaBuilder() + .addNullable("a", MinorType.FLOAT8) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema4, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + + // Table 5: Drop a non-nullable column, must replan + + final TupleMetadata schema6 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + try { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + new SmoothingProjection( + scanProj, schema6, priorSchema, rootTuple, + ScanTestUtils.resolvers()); + fail(); + } catch (final IncompatibleSchemaException e) { + // Expected + } + } + + /** + * Case in which the table schema is a superset of the prior + * schema. Discard prior schema. Turn off auto expansion of + * metadata for a simpler test. + */ + + @Test + public void testSmaller() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(priorSchema, rootTuple); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(tableSchema, rootTuple); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * Case in which the table schema and prior are disjoint + * sets. Discard the prior schema. + */ + + @Test + public void testDisjoint() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + private ResolvedRow doResolve(SchemaSmoother smoother, TupleMetadata schema) { + final NullColumnBuilder builder = new NullColumnBuilder(null, false); + final ResolvedRow rootTuple = new ResolvedRow(builder); + smoother.resolve(schema, rootTuple); + return rootTuple; + } + + /** + * Column names match, but types differ. Discard the prior schema. + */ + + @Test + public void testDifferentTypes() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * The prior and table schemas are identical. Preserve the prior + * schema (though, the output is no different than if we discarded + * the prior schema...) + */ + + @Test + public void testSameSchemas() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + final TupleMetadata actualSchema = ScanTestUtils.schema(rootTuple); + assertTrue(actualSchema.isEquivalent(tableSchema)); + assertTrue(actualSchema.isEquivalent(priorSchema)); + } + } + + /** + * The prior and table schemas are identical, but the cases of names differ. + * Preserve the case of the first schema. + */ + + @Test + public void testDifferentCase() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("A", MinorType.INT) + .add("B", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + final List<ResolvedColumn> columns = rootTuple.columns(); + assertEquals("a", columns.get(0).name()); + } + } + + /** + * Can't preserve the prior schema if it had required columns + * where the new schema has no columns. + */ + + @Test + public void testRequired() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(2, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(tableSchema)); + } + } + + /** + * Preserve the prior schema if table is a subset and missing columns + * are nullable or repeated. + */ + + @Test + public void testMissingNullableColumns() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .addNullable("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + } + + /** + * Preserve the prior schema if table is a subset. Map the table + * columns to the output using the prior schema ordering. + */ + + @Test + public void testReordering() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + final TupleMetadata priorSchema = new SchemaBuilder() + .addNullable("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .addArray("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata tableSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addNullable("a", MinorType.INT) + .buildSchema(); + + { + doResolve(smoother, priorSchema); + } + { + final ResolvedRow rootTuple = doResolve(smoother, tableSchema); + assertEquals(1, smoother.schemaVersion()); + assertTrue(ScanTestUtils.schema(rootTuple).isEquivalent(priorSchema)); + } + } + /** + * Integrated test across multiple schemas at the batch level. + */ + + @Test + public void testSmoothableSchemaBatches() { + final ScanLevelProjection scanProj = new ScanLevelProjection( + RowSetTestUtils.projectAll(), + ScanTestUtils.parsers()); + + final SchemaSmoother smoother = new SchemaSmoother(scanProj, + ScanTestUtils.resolvers()); + + // Table 1: (a: bigint, b) + + final TupleMetadata schema1 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema1); + + // Just use the original schema. + + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(1, smoother.schemaVersion()); + } + + // Table 2: (a: nullable bigint, c), column ommitted, original schema preserved + + final TupleMetadata schema2 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .add("c", MinorType.FLOAT8) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema2); + assertTrue(schema1.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(1, smoother.schemaVersion()); + } + + // Table 3: (a, c, d), column added, must replan schema + + final TupleMetadata schema3 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .add("c", MinorType.FLOAT8) + .add("d", MinorType.INT) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema3); + assertTrue(schema3.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(2, smoother.schemaVersion()); + } + + // Table 4: Drop a non-nullable column, must replan + + final TupleMetadata schema4 = new SchemaBuilder() + .addNullable("a", MinorType.BIGINT) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema4); + assertTrue(schema4.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(3, smoother.schemaVersion()); + } + + // Table 5: (a: double), change type must replan schema + + final TupleMetadata schema5 = new SchemaBuilder() + .addNullable("a", MinorType.FLOAT8) + .addNullable("b", MinorType.VARCHAR) + .buildSchema(); + { + final ResolvedRow rootTuple = doResolve(smoother, schema5); + assertTrue(schema5.isEquivalent(ScanTestUtils.schema(rootTuple))); + assertEquals(4, smoother.schemaVersion()); + } + } + + /** + * A SELECT * query uses the schema of the table as the output schema. + * This is trivial when the scanner has one table. But, if two or more + * tables occur, then things get interesting. The first table sets the + * schema. The second table then has: + * <ul> + * <li>The same schema, trivial case.</li> + * <li>A subset of the first table. The type of the "missing" column + * from the first table is used for a null column in the second table.</li> + * <li>A superset or disjoint set of the first schema. This triggers a hard schema + * change.</li> + * </ul> + * <p> + * It is an open question whether previous columns should be preserved on + * a hard reset. For now, the code implements, and this test verifies, that a + * hard reset clears the "memory" of prior schemas. + */ + + @Test + public void testWildcardSmoothing() { + final ScanSchemaOrchestrator projector = new ScanSchemaOrchestrator(fixture.allocator()); + projector.enableSchemaSmoothing(true); + projector.build(RowSetTestUtils.projectAll()); + + final TupleMetadata firstSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR, 10) + .addNullable("c", MinorType.BIGINT) + .buildSchema(); + final TupleMetadata subsetSchema = new SchemaBuilder() + .addNullable("b", MinorType.VARCHAR, 10) + .add("a", MinorType.INT) + .buildSchema(); + final TupleMetadata disjointSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("b", MinorType.VARCHAR, 10) + .add("d", MinorType.VARCHAR) + .buildSchema(); + + final SchemaTracker tracker = new SchemaTracker(); + int schemaVersion; + { + // First table, establishes the baseline + // ... FROM table 1 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(firstSchema); + + reader.startBatch(); + loader.writer() + .addRow(10, "fred", 110L) + .addRow(20, "wilma", 110L); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + schemaVersion = tracker.schemaVersion(); + + final SingleRowSet expected = fixture.rowSetBuilder(firstSchema) + .addRow(10, "fred", 110L) + .addRow(20, "wilma", 110L) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + { + // Second table, same schema, the trivial case + // ... FROM table 2 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(firstSchema); + + reader.startBatch(); + loader.writer() + .addRow(70, "pebbles", 770L) + .addRow(80, "hoppy", 880L); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + assertEquals(schemaVersion, tracker.schemaVersion()); + + final SingleRowSet expected = fixture.rowSetBuilder(firstSchema) + .addRow(70, "pebbles", 770L) + .addRow(80, "hoppy", 880L) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + { + // Third table: subset schema of first two + // ... FROM table 3 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(subsetSchema); + + reader.startBatch(); + loader.writer() + .addRow("bambam", 30) + .addRow("betty", 40); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + assertEquals(schemaVersion, tracker.schemaVersion()); + + final SingleRowSet expected = fixture.rowSetBuilder(firstSchema) + .addRow(30, "bambam", null) + .addRow(40, "betty", null) + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + { + // Fourth table: disjoint schema, cases a schema reset + // ... FROM table 4 + + final ReaderSchemaOrchestrator reader = projector.startReader(); + final ResultSetLoader loader = reader.makeTableLoader(disjointSchema); + + reader.startBatch(); + loader.writer() + .addRow(50, "dino", "supporting") + .addRow(60, "barney", "main"); + reader.endBatch(); + + tracker.trackSchema(projector.output()); + assertNotEquals(schemaVersion, tracker.schemaVersion()); + + final SingleRowSet expected = fixture.rowSetBuilder(disjointSchema) + .addRow(50, "dino", "supporting") + .addRow(60, "barney", "main") + .build(); + new RowSetComparison(expected) + .verifyAndClearAll(fixture.wrap(projector.output())); + } + + projector.close(); + } + + // TODO: Test schema smoothing with repeated + // TODO: Test hard schema change + // TODO: Typed null column tests (resurrect) + // TODO: Test maps and arrays of maps +} |