diff options
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java')
-rw-r--r-- | exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java | 478 |
1 files changed, 478 insertions, 0 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java new file mode 100644 index 000000000..cb0365ca9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.scan.project; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple; +import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow; +import org.apache.drill.exec.physical.impl.scan.project.VectorSource; +import org.apache.drill.exec.physical.rowSet.ResultVectorCache; +import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.schema.SchemaBuilder; +import org.apache.drill.test.rowSet.RowSetComparison; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.DrillBuf; + +import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue; +import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap; +import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray; + + +/** + * Test the row batch merger by merging two batches. Tests both the + * "direct" and "exchange" cases. Direct means that the output container + * contains the source vector directly: they are the same vectors. + * Exchange means we have two vectors, but we swap the underlying + * Drillbufs to effectively shift data from source to destination + * vector. + */ + +public class TestRowBatchMerger extends SubOperatorTest { + + public static class RowSetSource implements VectorSource { + + private SingleRowSet rowSet; + + public RowSetSource(SingleRowSet rowSet) { + this.rowSet = rowSet; + } + + public RowSet rowSet() { return rowSet; } + + public void clear() { + rowSet.clear(); + } + + @Override + public ValueVector vector(int index) { + return rowSet.container().getValueVector(index).getValueVector(); + } + } + + public static final int SLAB_SIZE = 16 * 1024 * 1024; + + @BeforeClass + public static void setup() { + // Party on 10 16MB blocks of memory to detect vector issues + DrillBuf bufs[] = new DrillBuf[10]; + for (int i = 0; i < bufs.length; i++) { + bufs[i] = fixture.allocator().buffer(SLAB_SIZE); + for (int j = 0; j < SLAB_SIZE / 4; j++) { + bufs[i].setInt(j * 4, 0xDEADBEEF); + } + } + for (int i = 0; i < bufs.length; i++) { + bufs[i].release(); + } + } + + private RowSetSource makeFirst() { + BatchSchema firstSchema = new SchemaBuilder() + .add("d", MinorType.VARCHAR) + .add("a", MinorType.INT) + .build(); + return new RowSetSource( + fixture.rowSetBuilder(firstSchema) + .addRow("barney", 10) + .addRow("wilma", 20) + .build()); + } + + private RowSetSource makeSecond() { + BatchSchema secondSchema = new SchemaBuilder() + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .build(); + return new RowSetSource( + fixture.rowSetBuilder(secondSchema) + .addRow(1, "foo.csv") + .addRow(2, "foo.csv") + .build()); + } + + public static class TestProjection extends ResolvedColumn { + + public TestProjection(VectorSource source, int sourceIndex) { + super(source, sourceIndex); + } + + @Override + public String name() { return null; } + + @Override + public int nodeType() { return -1; } + + @Override + public MaterializedField schema() { return null; } + } + + @Test + public void testSimpleFlat() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create the second batch + + RowSetSource second = makeSecond(); + + ResolvedRow resolvedTuple = new ResolvedRow(null); + resolvedTuple.add(new TestProjection(first, 1)); + resolvedTuple.add(new TestProjection(second, 0)); + resolvedTuple.add(new TestProjection(second, 1)); + resolvedTuple.add(new TestProjection(first, 0)); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(null, output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, 1, "foo.csv", "barney") + .addRow(20, 2, "foo.csv", "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + @Test + public void testImplicitFlat() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create the second batch + + RowSetSource second = makeSecond(); + + ResolvedRow resolvedTuple = new ResolvedRow(null); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + resolvedTuple.add(new TestProjection(second, 0)); + resolvedTuple.add(new TestProjection(second, 1)); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.INT) + .add("c", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, 1, "foo.csv", "barney") + .addRow(20, 2, "foo.csv", "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + @Test + public void testFlatWithNulls() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create null columns + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + + ResolvedRow resolvedTuple = new ResolvedRow(builder); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + resolvedTuple.add(resolvedTuple.nullBuilder().add("null1")); + resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + builder.build(cache); + builder.load(first.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + output.setRecordCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addNullable("null1", MinorType.INT) + .addNullable("null2", MinorType.VARCHAR) + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, null, null, "barney") + .addRow(20, null, null, "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + builder.close(); + } + + /** + * Test the ability to create maps from whole cloth if requested in + * the projection list, and the map is not available from the data + * source. + */ + + @Test + public void testNullMaps() { + + // Create the first batch + + RowSetSource first = makeFirst(); + + // Create null columns + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + resolvedTuple.add(new TestProjection(resolvedTuple, 1)); + + ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1"); + ResolvedTuple nullMap = nullMapCol.members(); + nullMap.add(nullMap.nullBuilder().add("null1")); + nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR))); + + ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2"); + ResolvedTuple nullMap2 = nullMapCol2.members(); + nullMap2.add(nullMap2.nullBuilder().add("null3")); + nullMap.add(nullMapCol2); + + resolvedTuple.add(nullMapCol); + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(first.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(first.rowSet().container(), output); + resolvedTuple.setRowCount(first.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("a", MinorType.INT) + .addMap("map1") + .addNullable("null1", MinorType.INT) + .addNullable("null2", MinorType.VARCHAR) + .addMap("map2") + .addNullable("null3", MinorType.INT) + .resumeMap() + .resumeSchema() + .add("d", MinorType.VARCHAR) + .build(); + SingleRowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow(10, mapValue(null, null, singleMap(null)), "barney") + .addRow(20, mapValue(null, null, singleMap(null)), "wilma") + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + resolvedTuple.close(); + } + + /** + * Test that the merger mechanism can rewrite a map to include + * projected null columns. + */ + + @Test + public void testMapRevision() { + + // Create the first batch + + BatchSchema inputSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMap("a") + .add("c", MinorType.INT) + .resumeSchema() + .build(); + RowSetSource input = new RowSetSource( + fixture.rowSetBuilder(inputSchema) + .addRow("barney", singleMap(10)) + .addRow("wilma", singleMap(20)) + .build()); + + // Create mappings + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, + inputSchema.getColumn(1), 1); + resolvedTuple.add(mapCol); + ResolvedTuple map = mapCol.members(); + map.add(new TestProjection(map, 0)); + map.add(map.nullBuilder().add("null1")); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(input.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(input.rowSet().container(), output); + output.setRecordCount(input.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMap("a") + .add("c", MinorType.INT) + .addNullable("null1", MinorType.INT) + .resumeSchema() + .build(); + RowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("barney", mapValue(10, null)) + .addRow("wilma", mapValue(20, null)) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + + /** + * Test that the merger mechanism can rewrite a map array to include + * projected null columns. + */ + + @Test + public void testMapArrayRevision() { + + // Create the first batch + + BatchSchema inputSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMapArray("a") + .add("c", MinorType.INT) + .resumeSchema() + .build(); + RowSetSource input = new RowSetSource( + fixture.rowSetBuilder(inputSchema) + .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12))) + .addRow("wilma", mapArray(singleMap(20), singleMap(21))) + .build()); + + // Create mappings + + NullColumnBuilder builder = new NullColumnBuilder(null, false); + ResolvedRow resolvedTuple = new ResolvedRow(builder); + + resolvedTuple.add(new TestProjection(resolvedTuple, 0)); + ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple, + inputSchema.getColumn(1), 1); + resolvedTuple.add(mapCol); + ResolvedTuple map = mapCol.members(); + map.add(new TestProjection(map, 0)); + map.add(map.nullBuilder().add("null1")); + + // Build the null values + + ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator()); + resolvedTuple.buildNulls(cache); + + // LoadNulls + + resolvedTuple.loadNulls(input.rowSet().rowCount()); + + // Do the merge + + VectorContainer output = new VectorContainer(fixture.allocator()); + resolvedTuple.project(input.rowSet().container(), output); + output.setRecordCount(input.rowSet().rowCount()); + RowSet result = fixture.wrap(output); + + // Verify + + BatchSchema expectedSchema = new SchemaBuilder() + .add("b", MinorType.VARCHAR) + .addMapArray("a") + .add("c", MinorType.INT) + .addNullable("null1", MinorType.INT) + .resumeSchema() + .build(); + RowSet expected = fixture.rowSetBuilder(expectedSchema) + .addRow("barney", mapArray( + mapValue(10, null), mapValue(11, null), mapValue(12, null))) + .addRow("wilma", mapArray( + mapValue(20, null), mapValue(21, null))) + .build(); + + new RowSetComparison(expected) + .verifyAndClearAll(result); + } + +} |