aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2018-08-28 15:57:04 -0700
committerBoaz Ben-Zvi <boaz@mapr.com>2018-08-29 11:51:42 -0700
commit5ed5df2cef7b3aac83401a3df6bbf9716505b226 (patch)
tree1f7ed17e48b1f9e5a6cec43719405105ce3f7f1d /exec/java-exec/src/test/java/org/apache/drill/exec/physical
parente43cfebe8229bc8cd01ba5a3eafeeef987f5e425 (diff)
DRILL-6517: Hash Join handling uninitialized vector container
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java8
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java212
2 files changed, 215 insertions, 5 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
index 9e048b050..34d735e76 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/MockRecordBatch.java
@@ -202,7 +202,6 @@ public class MockRecordBatch implements CloseableRecordBatch {
if (currentContainerIndex < rowSets.size()) {
final RowSet rowSet = rowSets.get(currentContainerIndex);
final VectorContainer input = rowSet.container();
- final int recordCount = input.getRecordCount();
// We need to do this since the downstream operator expects vector reference to be same
// after first next call in cases when schema is not changed
final BatchSchema inputSchema = input.getSchema();
@@ -215,7 +214,9 @@ public class MockRecordBatch implements CloseableRecordBatch {
case NONE:
case TWO_BYTE:
container.transferIn(input);
- container.setRecordCount(recordCount);
+ if ( input.hasRecordCount() ) { // in case special test of uninitialized input container
+ container.setRecordCount(input.getRecordCount());
+ }
final SelectionVector2 inputSv2 = ((RowSet.SingleRowSet) rowSet).getSv2();
if (sv2 != null) {
@@ -257,10 +258,7 @@ public class MockRecordBatch implements CloseableRecordBatch {
case NONE:
case STOP:
case OUT_OF_MEMORY:
- //case OK_NEW_SCHEMA:
isDone = true;
- container.setRecordCount(0);
- return currentOutcome;
case NOT_YET:
container.setRecordCount(0);
return currentOutcome;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
new file mode 100644
index 000000000..349a29511
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.test.PhysicalOpUnitTestBase;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.exec.store.mock.MockStorePOP;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests of the Hash Join getting various outcomes as input
+ * with uninitialized vector containers
+ */
+@Category(OperatorTest.class)
+public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestHashJoinOutcome.class);
+
+ // input batch schemas
+ private static TupleSchema inputSchemaRight;
+ private static TupleSchema inputSchemaLeft;
+ private static BatchSchema batchSchemaRight;
+ private static BatchSchema batchSchemaLeft;
+
+ // Input containers -- where row count is not set for the 2nd container !!
+ private List<VectorContainer> uninitialized2ndInputContainersRight = new ArrayList<>(5);
+ private List<VectorContainer> uninitialized2ndInputContainersLeft = new ArrayList<>(5);
+
+ private RowSet.SingleRowSet emptyInputRowSetRight;
+ private RowSet.SingleRowSet emptyInputRowSetLeft;
+
+ // default Non-Empty input RowSets
+ private RowSet.SingleRowSet nonEmptyInputRowSetRight;
+ private RowSet.SingleRowSet nonEmptyInputRowSetLeft;
+
+ // List of incoming containers
+ private final List<VectorContainer> inputContainerRight = new ArrayList<>(5);
+ private final List<VectorContainer> inputContainerLeft = new ArrayList<>(5);
+
+ // List of incoming IterOutcomes
+ private final List<RecordBatch.IterOutcome> inputOutcomesRight = new ArrayList<>(5);
+ private final List<RecordBatch.IterOutcome> inputOutcomesLeft = new ArrayList<>(5);
+
+ @BeforeClass
+ public static void setUpBeforeClass() {
+ inputSchemaRight = (TupleSchema) new SchemaBuilder()
+ .add("rightcol", TypeProtos.MinorType.INT)
+ .buildSchema();
+ batchSchemaRight = inputSchemaRight.toBatchSchema(BatchSchema.SelectionVectorMode.NONE);
+ inputSchemaLeft = (TupleSchema) new SchemaBuilder()
+ .add("leftcol", TypeProtos.MinorType.INT)
+ .buildSchema();
+ batchSchemaLeft = inputSchemaLeft.toBatchSchema(BatchSchema.SelectionVectorMode.NONE);
+ }
+
+ private void prepareUninitContainers(List<VectorContainer> emptyInputContainers,
+ BatchSchema batchSchema) {
+ BufferAllocator allocator = operatorFixture.getFragmentContext().getAllocator();
+
+ VectorContainer vc1 = new VectorContainer(allocator, batchSchema);
+ // set for first vc (with OK_NEW_SCHEMA) because record count is checked at AbstractRecordBatch.next
+ vc1.setRecordCount(0);
+ VectorContainer vc2 = new VectorContainer(allocator, batchSchema);
+ // Note - Uninitialized: Record count NOT SET for vc2 !!
+ emptyInputContainers.add(vc1);
+ emptyInputContainers.add(vc2);
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+
+ prepareUninitContainers(uninitialized2ndInputContainersLeft, batchSchemaLeft);
+
+ prepareUninitContainers(uninitialized2ndInputContainersRight, batchSchemaRight);
+
+ nonEmptyInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight)
+ .addRow(123)
+ .build();
+ nonEmptyInputRowSetLeft = operatorFixture.rowSetBuilder(inputSchemaLeft)
+ .addRow(123)
+ .build();
+
+ // Prepare various (empty/non-empty) containers for each side of the join
+ emptyInputRowSetLeft = operatorFixture.rowSetBuilder(inputSchemaLeft).build();
+ emptyInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).build();
+
+ inputContainerRight.add(emptyInputRowSetRight.container());
+ inputContainerRight.add(nonEmptyInputRowSetRight.container());
+
+ inputContainerLeft.add(emptyInputRowSetLeft.container());
+ inputContainerLeft.add(nonEmptyInputRowSetLeft.container());
+
+ final PhysicalOperator mockPopConfig = new MockStorePOP(null);
+ mockOpContext(mockPopConfig, 0, 0);
+ }
+
+ @After
+ public void afterTest() {
+ emptyInputRowSetRight.clear();
+ emptyInputRowSetLeft.clear();
+ nonEmptyInputRowSetRight.clear();
+ nonEmptyInputRowSetLeft.clear();
+ inputContainerRight.clear();
+ inputOutcomesRight.clear();
+ inputContainerLeft.clear();
+ inputOutcomesLeft.clear();
+ }
+
+ enum UninitializedSide { // which side of the join has an uninitialized container
+ Right(true), Left(false);
+ public boolean isRight;
+ UninitializedSide(boolean which) {this.isRight = which;}
+ }
+
+ /**
+ * Run the Hash Join where one side has an uninitialized container (the 2nd one)
+ * @param uninitializedSide Which side (right or left) is the uninitialized
+ * @param specialOutcome What outcome the uninitialized container has
+ * @param expectedOutcome what result outcome is expected
+ */
+ private void testHashJoinOutcomes(UninitializedSide uninitializedSide, RecordBatch.IterOutcome specialOutcome,
+ RecordBatch.IterOutcome expectedOutcome) {
+
+ inputOutcomesLeft.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomesLeft.add( uninitializedSide.isRight ? RecordBatch.IterOutcome.OK : specialOutcome);
+
+ inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomesRight.add( uninitializedSide.isRight ? specialOutcome : RecordBatch.IterOutcome.OK);
+
+ final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ uninitializedSide.isRight ? uninitialized2ndInputContainersRight : inputContainerRight,
+ inputOutcomesRight, batchSchemaRight);
+ final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ uninitializedSide.isRight ? inputContainerLeft : uninitialized2ndInputContainersLeft,
+ inputOutcomesLeft, batchSchemaLeft);
+
+ List<JoinCondition> conditions = Lists.newArrayList();
+
+ conditions.add(new JoinCondition( SqlKind.EQUALS.toString(),
+ FieldReference.getWithQuotedRef("leftcol"),
+ FieldReference.getWithQuotedRef("rightcol")));
+
+ HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER);
+
+ HashJoinBatch hjBatch = new HashJoinBatch(hjConf,operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight );
+
+ RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+ assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA );
+
+ gotOutcome = hjBatch.next();
+ assertTrue(gotOutcome == expectedOutcome); // verify returned outcome
+ }
+
+ @Test
+ public void testHashJoinStopOutcomeUninitRightSide() {
+ testHashJoinOutcomes(UninitializedSide.Right, RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP);
+ }
+
+ @Test
+ public void testHashJoinStopOutcomeUninitLeftSide() {
+ testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.STOP, RecordBatch.IterOutcome.STOP);
+ }
+
+ @Test
+ public void testHashJoinNoneOutcomeUninitRightSide() {
+ testHashJoinOutcomes(UninitializedSide.Right, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
+ }
+
+ @Test
+ public void testHashJoinNoneOutcomeUninitLeftSide() {
+ testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
+ }
+}