aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java')
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java478
1 files changed, 478 insertions, 0 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
new file mode 100644
index 000000000..cb0365ca9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/project/TestRowBatchMerger.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.project.NullColumnBuilder;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedMapColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedColumn;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
+import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple.ResolvedRow;
+import org.apache.drill.exec.physical.impl.scan.project.VectorSource;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.physical.rowSet.impl.NullResultVectorCacheImpl;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.DrillBuf;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+
+
+/**
+ * Test the row batch merger by merging two batches. Tests both the
+ * "direct" and "exchange" cases. Direct means that the output container
+ * contains the source vector directly: they are the same vectors.
+ * Exchange means we have two vectors, but we swap the underlying
+ * Drillbufs to effectively shift data from source to destination
+ * vector.
+ */
+
+public class TestRowBatchMerger extends SubOperatorTest {
+
+ public static class RowSetSource implements VectorSource {
+
+ private SingleRowSet rowSet;
+
+ public RowSetSource(SingleRowSet rowSet) {
+ this.rowSet = rowSet;
+ }
+
+ public RowSet rowSet() { return rowSet; }
+
+ public void clear() {
+ rowSet.clear();
+ }
+
+ @Override
+ public ValueVector vector(int index) {
+ return rowSet.container().getValueVector(index).getValueVector();
+ }
+ }
+
+ public static final int SLAB_SIZE = 16 * 1024 * 1024;
+
+ @BeforeClass
+ public static void setup() {
+ // Party on 10 16MB blocks of memory to detect vector issues
+ DrillBuf bufs[] = new DrillBuf[10];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = fixture.allocator().buffer(SLAB_SIZE);
+ for (int j = 0; j < SLAB_SIZE / 4; j++) {
+ bufs[i].setInt(j * 4, 0xDEADBEEF);
+ }
+ }
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i].release();
+ }
+ }
+
+ private RowSetSource makeFirst() {
+ BatchSchema firstSchema = new SchemaBuilder()
+ .add("d", MinorType.VARCHAR)
+ .add("a", MinorType.INT)
+ .build();
+ return new RowSetSource(
+ fixture.rowSetBuilder(firstSchema)
+ .addRow("barney", 10)
+ .addRow("wilma", 20)
+ .build());
+ }
+
+ private RowSetSource makeSecond() {
+ BatchSchema secondSchema = new SchemaBuilder()
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .build();
+ return new RowSetSource(
+ fixture.rowSetBuilder(secondSchema)
+ .addRow(1, "foo.csv")
+ .addRow(2, "foo.csv")
+ .build());
+ }
+
+ public static class TestProjection extends ResolvedColumn {
+
+ public TestProjection(VectorSource source, int sourceIndex) {
+ super(source, sourceIndex);
+ }
+
+ @Override
+ public String name() { return null; }
+
+ @Override
+ public int nodeType() { return -1; }
+
+ @Override
+ public MaterializedField schema() { return null; }
+ }
+
+ @Test
+ public void testSimpleFlat() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create the second batch
+
+ RowSetSource second = makeSecond();
+
+ ResolvedRow resolvedTuple = new ResolvedRow(null);
+ resolvedTuple.add(new TestProjection(first, 1));
+ resolvedTuple.add(new TestProjection(second, 0));
+ resolvedTuple.add(new TestProjection(second, 1));
+ resolvedTuple.add(new TestProjection(first, 0));
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(null, output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, 1, "foo.csv", "barney")
+ .addRow(20, 2, "foo.csv", "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testImplicitFlat() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create the second batch
+
+ RowSetSource second = makeSecond();
+
+ ResolvedRow resolvedTuple = new ResolvedRow(null);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+ resolvedTuple.add(new TestProjection(second, 0));
+ resolvedTuple.add(new TestProjection(second, 1));
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .add("b", MinorType.INT)
+ .add("c", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, 1, "foo.csv", "barney")
+ .addRow(20, 2, "foo.csv", "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ @Test
+ public void testFlatWithNulls() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create null columns
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+ resolvedTuple.add(resolvedTuple.nullBuilder().add("null1"));
+ resolvedTuple.add(resolvedTuple.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ builder.build(cache);
+ builder.load(first.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ output.setRecordCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .addNullable("null2", MinorType.VARCHAR)
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, null, null, "barney")
+ .addRow(20, null, null, "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ builder.close();
+ }
+
+ /**
+ * Test the ability to create maps from whole cloth if requested in
+ * the projection list, and the map is not available from the data
+ * source.
+ */
+
+ @Test
+ public void testNullMaps() {
+
+ // Create the first batch
+
+ RowSetSource first = makeFirst();
+
+ // Create null columns
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 1));
+
+ ResolvedMapColumn nullMapCol = new ResolvedMapColumn(resolvedTuple, "map1");
+ ResolvedTuple nullMap = nullMapCol.members();
+ nullMap.add(nullMap.nullBuilder().add("null1"));
+ nullMap.add(nullMap.nullBuilder().add("null2", Types.optional(MinorType.VARCHAR)));
+
+ ResolvedMapColumn nullMapCol2 = new ResolvedMapColumn(nullMap, "map2");
+ ResolvedTuple nullMap2 = nullMapCol2.members();
+ nullMap2.add(nullMap2.nullBuilder().add("null3"));
+ nullMap.add(nullMapCol2);
+
+ resolvedTuple.add(nullMapCol);
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(first.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(first.rowSet().container(), output);
+ resolvedTuple.setRowCount(first.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .addMap("map1")
+ .addNullable("null1", MinorType.INT)
+ .addNullable("null2", MinorType.VARCHAR)
+ .addMap("map2")
+ .addNullable("null3", MinorType.INT)
+ .resumeMap()
+ .resumeSchema()
+ .add("d", MinorType.VARCHAR)
+ .build();
+ SingleRowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow(10, mapValue(null, null, singleMap(null)), "barney")
+ .addRow(20, mapValue(null, null, singleMap(null)), "wilma")
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ resolvedTuple.close();
+ }
+
+ /**
+ * Test that the merger mechanism can rewrite a map to include
+ * projected null columns.
+ */
+
+ @Test
+ public void testMapRevision() {
+
+ // Create the first batch
+
+ BatchSchema inputSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMap("a")
+ .add("c", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSetSource input = new RowSetSource(
+ fixture.rowSetBuilder(inputSchema)
+ .addRow("barney", singleMap(10))
+ .addRow("wilma", singleMap(20))
+ .build());
+
+ // Create mappings
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+ inputSchema.getColumn(1), 1);
+ resolvedTuple.add(mapCol);
+ ResolvedTuple map = mapCol.members();
+ map.add(new TestProjection(map, 0));
+ map.add(map.nullBuilder().add("null1"));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(input.rowSet().container(), output);
+ output.setRecordCount(input.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMap("a")
+ .add("c", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("barney", mapValue(10, null))
+ .addRow("wilma", mapValue(20, null))
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+ /**
+ * Test that the merger mechanism can rewrite a map array to include
+ * projected null columns.
+ */
+
+ @Test
+ public void testMapArrayRevision() {
+
+ // Create the first batch
+
+ BatchSchema inputSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMapArray("a")
+ .add("c", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSetSource input = new RowSetSource(
+ fixture.rowSetBuilder(inputSchema)
+ .addRow("barney", mapArray(singleMap(10), singleMap(11), singleMap(12)))
+ .addRow("wilma", mapArray(singleMap(20), singleMap(21)))
+ .build());
+
+ // Create mappings
+
+ NullColumnBuilder builder = new NullColumnBuilder(null, false);
+ ResolvedRow resolvedTuple = new ResolvedRow(builder);
+
+ resolvedTuple.add(new TestProjection(resolvedTuple, 0));
+ ResolvedMapColumn mapCol = new ResolvedMapColumn(resolvedTuple,
+ inputSchema.getColumn(1), 1);
+ resolvedTuple.add(mapCol);
+ ResolvedTuple map = mapCol.members();
+ map.add(new TestProjection(map, 0));
+ map.add(map.nullBuilder().add("null1"));
+
+ // Build the null values
+
+ ResultVectorCache cache = new NullResultVectorCacheImpl(fixture.allocator());
+ resolvedTuple.buildNulls(cache);
+
+ // LoadNulls
+
+ resolvedTuple.loadNulls(input.rowSet().rowCount());
+
+ // Do the merge
+
+ VectorContainer output = new VectorContainer(fixture.allocator());
+ resolvedTuple.project(input.rowSet().container(), output);
+ output.setRecordCount(input.rowSet().rowCount());
+ RowSet result = fixture.wrap(output);
+
+ // Verify
+
+ BatchSchema expectedSchema = new SchemaBuilder()
+ .add("b", MinorType.VARCHAR)
+ .addMapArray("a")
+ .add("c", MinorType.INT)
+ .addNullable("null1", MinorType.INT)
+ .resumeSchema()
+ .build();
+ RowSet expected = fixture.rowSetBuilder(expectedSchema)
+ .addRow("barney", mapArray(
+ mapValue(10, null), mapValue(11, null), mapValue(12, null)))
+ .addRow("wilma", mapArray(
+ mapValue(20, null), mapValue(21, null)))
+ .build();
+
+ new RowSetComparison(expected)
+ .verifyAndClearAll(result);
+ }
+
+}