diff options
author | Sorabh Hamirwasia <shamirwasia@maprtech.com> | 2018-06-29 10:27:55 -0700 |
---|---|---|
committer | Parth Chandra <parthc@apache.org> | 2018-07-02 14:06:38 -0700 |
commit | 069c3049f1a500e5ae0b47caeebc5856ab182b73 (patch) | |
tree | 8c44e7ddb0e9bdf0dadd6ba9a7e400aaaa02cb8a /exec/java-exec/src/main/java | |
parent | 208733b52ec40fd49e6bd424782f7c71aabef7e3 (diff) |
DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules
This closes #1356
Diffstat (limited to 'exec/java-exec/src/main/java')
4 files changed, 85 insertions, 15 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 047c59705..345d18200 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -67,9 +68,6 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; -import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX; -import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX; - /** * This class implements the runtime execution for the Hash-Join operator * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins @@ -892,7 +890,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}", configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); - batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right); + batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>()); + logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 84dc5c344..fc3c8b188 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.join; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; @@ -27,6 +28,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.LateralContract; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; @@ -34,10 +36,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.SchemaBuilder; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; +import java.util.HashSet; +import java.util.List; + import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; @@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Flag to keep track of new left batch so that update on memory manager is called only once per left batch private boolean isNewLeftBatch = false; + private final HashSet<String> excludedFieldNames = new HashSet<>(); + /* **************************************************************************************************************** * Public Methods * ****************************************************************************************************************/ @@ -91,7 +99,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> Preconditions.checkNotNull(left); Preconditions.checkNotNull(right); final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right); + // Prepare Schema Path Mapping + populateExcludedField(popConfig); + batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames); // Initially it's set to default value of 64K and later for each new output row it will be set to the computed // row count @@ -700,6 +710,21 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> return isValid; } + private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) { + if (excludedFieldNames.size() == 0) { + return originSchema; + } + + final SchemaBuilder newSchemaBuilder = + BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode()); + for (MaterializedField field : originSchema) { + if (!excludedFieldNames.contains(field.getName())) { + newSchemaBuilder.addField(field); + } + } + return newSchemaBuilder.build(); + } + /** * Helps to create the outgoing container vectors based on known left and right batch schemas * @throws SchemaChangeException @@ -711,8 +736,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Clear up the container container.clear(); - leftSchema = left.getSchema(); - rightSchema = right.getSchema(); + leftSchema = batchSchemaWithNoExcludedCols(left.getSchema()); + rightSchema = batchSchemaWithNoExcludedCols(right.getSchema()); if (!verifyInputSchema(leftSchema)) { throw new SchemaChangeException("Invalid Schema found for left incoming batch"); @@ -724,12 +749,20 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Setup LeftSchema in outgoing container for (final VectorWrapper<?> vectorWrapper : left) { - container.addOrGet(vectorWrapper.getField()); + final MaterializedField leftField = vectorWrapper.getField(); + if (excludedFieldNames.contains(leftField.getName())) { + continue; + } + container.addOrGet(leftField); } // Setup RightSchema in the outgoing container for (final VectorWrapper<?> vectorWrapper : right) { MaterializedField rightField = vectorWrapper.getField(); + if (excludedFieldNames.contains(rightField.getName())) { + continue; + } + TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType(); // make right input schema optional if we have LEFT join @@ -846,15 +879,28 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // Get the vectors using field index rather than Materialized field since input batch field can be different from // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output // container. + int inputIndex = 0; for (int i = startVectorIndex; i < endVectorIndex; ++i) { - // Get input vector - final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass(); - final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector(); - // Get output vector final int outputVectorIndex = i + baseVectorIndex; final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass(); final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector(); + final String outputFieldName = outputVector.getField().getName(); + + ValueVector inputVector; + Class<?> inputValueClass; + String inputFieldName; + do { + // Get input vector + inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass(); + inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector(); + inputFieldName = inputVector.getField().getName(); + ++inputIndex; + } while (excludedFieldNames.contains(inputFieldName)); + + Preconditions.checkArgument(outputFieldName.equals(inputFieldName), + new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" + + ". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema()))); logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " + "(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," + @@ -938,4 +984,13 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> maxOutputRowCount = newOutputRowCount; } } + + private void populateExcludedField(PhysicalOperator lateralPop) { + final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns(); + if (excludedCols != null) { + for (SchemaPath currentPath : excludedCols) { + excludedFieldNames.add(currentPath.rootName()); + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 62967a9fa..ea34ed930 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -62,6 +62,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import java.io.IOException; +import java.util.HashSet; import java.util.List; import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; @@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { private class MergeJoinMemoryManager extends JoinBatchMemoryManager { MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { - super(outputBatchSize, leftBatch, rightBatch); + super(outputBatchSize, leftBatch, rightBatch, new HashSet<>()); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index 2ebe88783..4344e1374 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -17,29 +17,44 @@ */ package org.apache.drill.exec.record; +import java.util.Set; + public class JoinBatchMemoryManager extends RecordBatchMemoryManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class); private int rowWidth[]; private RecordBatch recordBatch[]; + private Set<String> columnsToExclude; private static final int numInputs = 2; public static final int LEFT_INDEX = 0; public static final int RIGHT_INDEX = 1; - public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { + public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, + RecordBatch rightBatch, Set<String> excludedColumns) { super(numInputs, outputBatchSize); recordBatch = new RecordBatch[numInputs]; recordBatch[LEFT_INDEX] = leftBatch; recordBatch[RIGHT_INDEX] = rightBatch; rowWidth = new int[numInputs]; + this.columnsToExclude = excludedColumns; } private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { updateIncomingStats(inputIndex); rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth(); - final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX]; + // Reduce the width of excluded columns from actual rowWidth + for (String columnName : columnsToExclude) { + final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName); + if (currentColSizer == null) { + continue; + } + rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry(); + } + + // Get final net outgoing row width after reducing the excluded columns width + int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX]; // If outgoing row width is 0 or there is no change in outgoing row width, just return. // This is possible for empty batches or |