aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java
diff options
context:
space:
mode:
authorSorabh Hamirwasia <shamirwasia@maprtech.com>2018-06-29 10:27:55 -0700
committerParth Chandra <parthc@apache.org>2018-07-02 14:06:38 -0700
commit069c3049f1a500e5ae0b47caeebc5856ab182b73 (patch)
tree8c44e7ddb0e9bdf0dadd6ba9a7e400aaaa02cb8a /exec/java-exec/src/main/java
parent208733b52ec40fd49e6bd424782f7c71aabef7e3 (diff)
DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
This closes #1356
Diffstat (limited to 'exec/java-exec/src/main/java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java19
4 files changed, 85 insertions, 15 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 047c59705..345d18200 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -67,9 +68,6 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
-import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;
-
/**
* This class implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -892,7 +890,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
- batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
+ batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
+ logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 84dc5c344..fc3c8b188 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
@@ -27,6 +28,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.LateralContract;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -34,10 +36,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.HashSet;
+import java.util.List;
+
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Flag to keep track of new left batch so that update on memory manager is called only once per left batch
private boolean isNewLeftBatch = false;
+ private final HashSet<String> excludedFieldNames = new HashSet<>();
+
/* ****************************************************************************************************************
* Public Methods
* ****************************************************************************************************************/
@@ -91,7 +99,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);
final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
- batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
+ // Prepare Schema Path Mapping
+ populateExcludedField(popConfig);
+ batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);
// Initially it's set to default value of 64K and later for each new output row it will be set to the computed
// row count
@@ -700,6 +710,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
return isValid;
}
+ private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
+ if (excludedFieldNames.size() == 0) {
+ return originSchema;
+ }
+
+ final SchemaBuilder newSchemaBuilder =
+ BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
+ for (MaterializedField field : originSchema) {
+ if (!excludedFieldNames.contains(field.getName())) {
+ newSchemaBuilder.addField(field);
+ }
+ }
+ return newSchemaBuilder.build();
+ }
+
/**
* Helps to create the outgoing container vectors based on known left and right batch schemas
* @throws SchemaChangeException
@@ -711,8 +736,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Clear up the container
container.clear();
- leftSchema = left.getSchema();
- rightSchema = right.getSchema();
+ leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
+ rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());
if (!verifyInputSchema(leftSchema)) {
throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -724,12 +749,20 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Setup LeftSchema in outgoing container
for (final VectorWrapper<?> vectorWrapper : left) {
- container.addOrGet(vectorWrapper.getField());
+ final MaterializedField leftField = vectorWrapper.getField();
+ if (excludedFieldNames.contains(leftField.getName())) {
+ continue;
+ }
+ container.addOrGet(leftField);
}
// Setup RightSchema in the outgoing container
for (final VectorWrapper<?> vectorWrapper : right) {
MaterializedField rightField = vectorWrapper.getField();
+ if (excludedFieldNames.contains(rightField.getName())) {
+ continue;
+ }
+
TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();
// make right input schema optional if we have LEFT join
@@ -846,15 +879,28 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Get the vectors using field index rather than Materialized field since input batch field can be different from
// output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
// container.
+ int inputIndex = 0;
for (int i = startVectorIndex; i < endVectorIndex; ++i) {
- // Get input vector
- final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass();
- final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector();
-
// Get output vector
final int outputVectorIndex = i + baseVectorIndex;
final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
+ final String outputFieldName = outputVector.getField().getName();
+
+ ValueVector inputVector;
+ Class<?> inputValueClass;
+ String inputFieldName;
+ do {
+ // Get input vector
+ inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
+ inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
+ inputFieldName = inputVector.getField().getName();
+ ++inputIndex;
+ } while (excludedFieldNames.contains(inputFieldName));
+
+ Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
+ new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
+ ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));
logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
"(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
@@ -938,4 +984,13 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
maxOutputRowCount = newOutputRowCount;
}
}
+
+ private void populateExcludedField(PhysicalOperator lateralPop) {
+ final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
+ if (excludedCols != null) {
+ for (SchemaPath currentPath : excludedCols) {
+ excludedFieldNames.add(currentPath.rootName());
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 62967a9fa..ea34ed930 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -62,6 +62,7 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
@@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
private class MergeJoinMemoryManager extends JoinBatchMemoryManager {
MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
- super(outputBatchSize, leftBatch, rightBatch);
+ super(outputBatchSize, leftBatch, rightBatch, new HashSet<>());
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 2ebe88783..4344e1374 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -17,29 +17,44 @@
*/
package org.apache.drill.exec.record;
+import java.util.Set;
+
public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);
private int rowWidth[];
private RecordBatch recordBatch[];
+ private Set<String> columnsToExclude;
private static final int numInputs = 2;
public static final int LEFT_INDEX = 0;
public static final int RIGHT_INDEX = 1;
- public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
+ public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
+ RecordBatch rightBatch, Set<String> excludedColumns) {
super(numInputs, outputBatchSize);
recordBatch = new RecordBatch[numInputs];
recordBatch[LEFT_INDEX] = leftBatch;
recordBatch[RIGHT_INDEX] = rightBatch;
rowWidth = new int[numInputs];
+ this.columnsToExclude = excludedColumns;
}
private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
- final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
+ // Reduce the width of excluded columns from actual rowWidth
+ for (String columnName : columnsToExclude) {
+ final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
+ if (currentColSizer == null) {
+ continue;
+ }
+ rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
+ }
+
+ // Get final net outgoing row width after reducing the excluded columns width
+ int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
// If outgoing row width is 0 or there is no change in outgoing row width, just return.
// This is possible for empty batches or