diff options
author | Amit Hadke <amit.hadke@gmail.com> | 2015-10-07 18:23:21 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-11-04 20:55:54 -0800 |
commit | 2bc16a90b4883b2b2e3213f5e7a46ff1ea78bd98 (patch) | |
tree | a74cf223fcd8b9ff117eb24e5b0657f6899ec445 /exec | |
parent | 39582bd60c9eaaaa9b16aba4f099d434e927e7e5 (diff) |
DRILL-3793: New MergeJoin and add RecordIterator interface
This closes #190
Diffstat (limited to 'exec')
14 files changed, 730 insertions, 796 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 8f72d327d..f7154f822 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -18,177 +18,82 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.record.RecordIterator; import org.apache.calcite.rel.core.JoinRelType; /** - * The status of the current join. Maintained outside the individually compiled join templates so that we can carry status across multiple schemas. + * Maintain join state. */ public final class JoinStatus { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class); private static final int OUTPUT_BATCH_SIZE = 32*1024; - public static enum RightSourceMode { - INCOMING, SV4; - } - - private static enum InitState { - INIT, // initial state - CHECK, // need to check if batches are empty - READY // read to do work - } - - private static final int LEFT_INPUT = 0; - private static final int RIGHT_INPUT = 1; - - public final RecordBatch left; - private int leftPosition; - private IterOutcome lastLeft; - - public final RecordBatch right; - private int rightPosition; - private int svRightPosition; - private IterOutcome lastRight; + public final RecordIterator left; + public final RecordIterator right; + private boolean iteratorInitialized; private int outputPosition; - public RightSourceMode rightSourceMode = RightSourceMode.INCOMING; public MergeJoinBatch outputBatch; - public SelectionVector4 sv4; - - private boolean hasIntermediateData; - private int initialRightPosition = -1; - private boolean crossedBatchBoundaries; private final JoinRelType joinType; + private boolean allowMarking; public boolean ok = true; - private InitState initialSet = InitState.INIT; - private boolean leftRepeating = false; - public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) { - super(); + public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch output) { this.left = left; this.right = right; this.outputBatch = output; this.joinType = output.getJoinType(); + this.iteratorInitialized = false; + this.allowMarking = true; } @Override public String toString() { return - super.toString() + super.toString() + "[" - + "leftPosition = " + leftPosition - + ", rightPosition = " + rightPosition - + ", svRightPosition = " + svRightPosition + + "leftPosition = " + left.getCurrentPosition() + + ", rightPosition = " + right.getCurrentPosition() + ", outputPosition = " + outputPosition - + ", lastLeft = " + lastLeft - + ", lastRight = " + lastRight - + ", rightSourceMode = " + rightSourceMode - + ", sv4 = " + sv4 + ", joinType = " + joinType + ", ok = " + ok - + ", initialSet = " + initialSet - + ", leftRepeating = " + leftRepeating + + ", initialSet = " + iteratorInitialized + ", left = " + left + ", right = " + right + ", outputBatch = " + outputBatch + "]"; } - public boolean hasIntermediateData() { - return hasIntermediateData; - } - - public void resetIntermediateData() { - hasIntermediateData = false; - } - - public void setIntermediateData(int initialRightPosition, boolean crossedBatchBoundaries) { - this.initialRightPosition = initialRightPosition; - this.crossedBatchBoundaries = crossedBatchBoundaries; - this.hasIntermediateData = true; - } - - public int getInitialRightPosition() { - return initialRightPosition; - } - - public boolean getCrossedBatchBoundaries() { - return crossedBatchBoundaries; - } - - private final IterOutcome nextLeft() { - return outputBatch.next(LEFT_INPUT, left); - } - - private final IterOutcome nextRight() { - return outputBatch.next(RIGHT_INPUT, right); - } - - public final void ensureInitial() { - switch(initialSet) { - case INIT: - this.lastLeft = nextLeft(); - this.lastRight = nextRight(); - initialSet = InitState.CHECK; - break; - case CHECK: - if (lastLeft != IterOutcome.NONE && left.getRecordCount() == 0) { - this.lastLeft = nextLeft(); - } - if (lastRight != IterOutcome.NONE && right.getRecordCount() == 0) { - this.lastRight = nextRight(); - } - initialSet = InitState.READY; - // fall through - default: - break; + // Initialize left and right record iterator. We avoid doing this in constructor. + // Callers must check state of each iterator after calling ensureInitial. + public void initialize() { + if (!iteratorInitialized) { + left.next(); + right.next(); + iteratorInitialized = true; } } - public final void advanceLeft() { - leftPosition++; - } - - public final void advanceRight() { - if (rightSourceMode == RightSourceMode.INCOMING) { - rightPosition++; - } else { - svRightPosition++; + public void prepare() { + if (!iteratorInitialized) { + initialize(); } + left.prepare(); + right.prepare(); } - public final int getLeftPosition() { - return leftPosition; - } - - public final int getRightPosition() { - return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition; - } - - public final int getRightCount() { - return right.getRecordCount(); - } - - public final void setRightPosition(int pos) { - rightPosition = pos; - } + public IterOutcome getLeftStatus() { return left.getLastOutcome(); } + public IterOutcome getRightStatus() { return right.getLastOutcome(); } public final int getOutPosition() { return outputPosition; } - public final int fetchAndIncOutputPos() { - return outputPosition++; - } - public final void resetOutputPos() { outputPosition = 0; } @@ -198,145 +103,61 @@ public final class JoinStatus { } public final void incOutputPos() { - outputPosition++; - } - - public final void notifyLeftRepeating() { - leftRepeating = true; - outputBatch.resetBatchBuilder(); - } - - public final void notifyLeftStoppedRepeating() { - leftRepeating = false; - svRightPosition = 0; + ++outputPosition; } - public final boolean isLeftRepeating() { - return leftRepeating; + public void disableMarking() { + allowMarking = false; } - public void setDefaultAdvanceMode() { - rightSourceMode = RightSourceMode.INCOMING; + public void enableMarking() { + allowMarking = true; } - public void setSV4AdvanceMode() { - rightSourceMode = RightSourceMode.SV4; - svRightPosition = 0; + public boolean shouldMark() { + return allowMarking; } /** - * Check if the left record position can advance by one. - * Side effect: advances to next left batch if current left batch size is exceeded. + * Return state of join based on status of left and right iterator. + * @return + * 1. JoinOutcome.NO_MORE_DATA : Join is finished + * 2. JoinOutcome.FAILURE : There is an error during join. + * 3. JoinOutcome.BATCH_RETURNED : one of the side has data + * 4. JoinOutcome.SCHEMA_CHANGED : one of the side has change in schema. */ - public final boolean isLeftPositionAllowed() { - if (lastLeft == IterOutcome.NONE) { - return false; - } - if (!isLeftPositionInCurrentBatch()) { - leftPosition = 0; - releaseData(left); - lastLeft = nextLeft(); - return lastLeft == IterOutcome.OK; - } - lastLeft = IterOutcome.OK; - return true; - } - - /** - * Check if the right record position can advance by one. - * Side effect: advances to next right batch if current right batch size is exceeded - */ - public final boolean isRightPositionAllowed() { - if (rightSourceMode == RightSourceMode.SV4) { - return svRightPosition < sv4.getCount(); - } - if (lastRight == IterOutcome.NONE) { - return false; - } - if (!isRightPositionInCurrentBatch()) { - rightPosition = 0; - releaseData(right); - lastRight = nextRight(); - return lastRight == IterOutcome.OK; - } - lastRight = IterOutcome.OK; - return true; - } - - private void releaseData(RecordBatch b) { - for (VectorWrapper<?> v : b) { - v.clear(); - } - if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { - b.getSelectionVector2().clear(); - } - } - - /** - * Check if the left record position can advance by one in the current batch. - */ - public final boolean isLeftPositionInCurrentBatch() { - return leftPosition < left.getRecordCount(); - } - - /** - * Check if the right record position can advance by one in the current batch. - */ - public final boolean isRightPositionInCurrentBatch() { - return rightPosition < right.getRecordCount(); - } - - /** - * Check if the next left record position can advance by one in the current batch. - */ - public final boolean isNextLeftPositionInCurrentBatch() { - return leftPosition + 1 < left.getRecordCount(); - } - - public IterOutcome getLastRight() { - return lastRight; - } - - public IterOutcome getLastLeft() { - return lastLeft; - } - - /** - * Check if the next left record position can advance by one in the current batch. - */ - public final boolean isNextRightPositionInCurrentBatch() { - return rightPosition + 1 < right.getRecordCount(); - } - public JoinOutcome getOutcome() { if (!ok) { return JoinOutcome.FAILURE; } if (bothMatches(IterOutcome.NONE) || - (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || - (joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) || - (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) { + (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || + (joinType == JoinRelType.LEFT && getLeftStatus() == IterOutcome.NONE) || + (joinType == JoinRelType.RIGHT && getRightStatus() == IterOutcome.NONE)) { return JoinOutcome.NO_MORE_DATA; } if (bothMatches(IterOutcome.OK) || - (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) { + (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) { return JoinOutcome.BATCH_RETURNED; } if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) { return JoinOutcome.SCHEMA_CHANGED; } + // should never see NOT_YET if (eitherMatches(IterOutcome.NOT_YET)) { return JoinOutcome.WAITING; } + ok = false; + // on STOP, OUT_OF_MEMORY return FAILURE. return JoinOutcome.FAILURE; } private boolean bothMatches(IterOutcome outcome) { - return lastLeft == outcome && lastRight == outcome; + return getLeftStatus() == outcome && getRightStatus() == outcome; } private boolean eitherMatches(IterOutcome outcome) { - return lastLeft == outcome || lastRight == outcome; + return getLeftStatus() == outcome || getRightStatus() == outcome; } -} +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index bb43e83dc..ed900dbae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -26,52 +26,7 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.calcite.rel.core.JoinRelType; /** - * This join template uses a merge join to combine two ordered streams into a single larger batch. When joining - * single values on each side, the values can be copied to the outgoing batch immediately. The outgoing record batch - * should be sent as needed (e.g. schema change or outgoing batch full). When joining multiple values on one or - * both sides, two passes over the vectors will be made; one to construct the selection vector, and another to - * generate the outgoing batches once the duplicate value is no longer encountered. - * - * Given two tables ordered by 'col1': - * - * t1 t2 - * --------------- --------------- - * | key | col2 | | key | col2 | - * --------------- --------------- - * | 1 | 'ab' | | 1 | 'AB' | - * | 2 | 'cd' | | 2 | 'CD' | - * | 2 | 'ef' | | 4 | 'EF' | - * | 4 | 'gh' | | 4 | 'GH' | - * | 4 | 'ij' | | 5 | 'IJ' | - * --------------- --------------- - * - * 'SELECT * FROM t1 INNER JOIN t2 on (t1.key == t2.key)' should generate the following: - * - * --------------------------------- - * | t1.key | t2.key | col1 | col2 | - * --------------------------------- - * | 1 | 1 | 'ab' | 'AB' | - * | 2 | 2 | 'cd' | 'CD' | - * | 2 | 2 | 'ef' | 'CD' | - * | 4 | 4 | 'gh' | 'EF' | - * | 4 | 4 | 'gh' | 'GH' | - * | 4 | 4 | 'ij' | 'EF' | - * | 4 | 4 | 'ij' | 'GH' | - * --------------------------------- - * - * In the simple match case, only one row from each table matches. Additional cases should be considered: - * - a left join key matches multiple right join keys - * - duplicate keys which may span multiple record batches (on the left and/or right side) - * - one or both incoming record batches change schemas - * - * In the case where a left join key matches multiple right join keys: - * - add a reference to all of the right table's matching values to the SV4. - * - * A RecordBatchData object should be used to hold onto all batches which have not been sent. - * - * JoinStatus: - * - all state related to the join operation is stored in the JoinStatus object. - * - this is required since code may be regenerated before completion of an outgoing record batch. + * Merge Join implementation using RecordIterator. */ public abstract class JoinTemplate implements JoinWorker { @@ -86,138 +41,88 @@ public abstract class JoinTemplate implements JoinWorker { * @return true of join succeeded; false if the worker needs to be regenerated */ public final boolean doJoin(final JoinStatus status) { - while(!status.isOutgoingBatchFull()) { - // for each record + final boolean isLeftJoin = (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT); - // validate input iterators (advancing to the next record batch if necessary) - if (!status.isRightPositionAllowed()) { - if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { - // we've hit the end of the right record batch; copy any remaining values from the left batch - while (status.isLeftPositionAllowed()) { + while (!status.isOutgoingBatchFull()) { + if (status.right.finished()) { + if (isLeftJoin) { + while (!status.left.finished()) { if (status.isOutgoingBatchFull()) { return true; } - doCopyLeft(status.getLeftPosition(), status.getOutPosition()); - + doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition()); status.incOutputPos(); - status.advanceLeft(); + status.left.next(); } } return true; } - if (!status.isLeftPositionAllowed()) { + if (status.left.finished()) { return true; } - - int comparison = doCompare(status.getLeftPosition(), status.getRightPosition()); + final int comparison = doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition()); switch (comparison) { - - case -1: - // left key < right key - if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { - doCopyLeft(status.getLeftPosition(), status.getOutPosition()); - status.incOutputPos(); - } - status.advanceLeft(); - continue; - - case 0: - // left key == right key - - // check for repeating values on the left side - if (!status.isLeftRepeating() && - status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) == 0) { - // subsequent record(s) in the left batch have the same key - status.notifyLeftRepeating(); - } else if (status.isLeftRepeating() && - status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) != 0) { - // this record marks the end of repeated keys - status.notifyLeftStoppedRepeating(); - } - - boolean crossedBatchBoundaries; - int initialRightPosition; - if (status.hasIntermediateData()) { - crossedBatchBoundaries = status.getCrossedBatchBoundaries(); - initialRightPosition = status.getInitialRightPosition(); - status.resetIntermediateData(); - } else { - crossedBatchBoundaries = false; - initialRightPosition = status.getRightPosition(); - } - - do { - if (status.isOutgoingBatchFull()) { - status.setIntermediateData(initialRightPosition, crossedBatchBoundaries); - return true; + case -1: + // left key < right key + if (isLeftJoin) { + doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition()); + status.incOutputPos(); } - // copy all equal right keys to the output record batch - doCopyLeft(status.getLeftPosition(), status.getOutPosition()); - - doCopyRight(status.getRightPosition(), status.getOutPosition()); - - status.incOutputPos(); - - // If the left key has duplicates and we're about to cross a boundary in the right batch, add the - // right table's record batch to the sv4 builder before calling next. These records will need to be - // copied again for each duplicate left key. - if (status.isLeftRepeating() && !status.isRightPositionInCurrentBatch()) { - status.outputBatch.addRightToBatchBuilder(); - crossedBatchBoundaries = true; + status.left.next(); + continue; + + case 0: + // left key == right key + // Mark current position in right iterator. + // If we have set a mark in previous iteration but didn't finish the inner loop, + // skip current right side as its already copied in earlier iteration. + if (status.shouldMark()) { + status.right.mark(); + // Copy all equal keys from right side to the output record batch. + doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition()); + doCopyRight(status.right.getCurrentPosition(), status.getOutPosition()); + status.incOutputPos(); } - status.advanceRight(); - - } while ((!status.isLeftRepeating() || status.isRightPositionInCurrentBatch()) - && status.isRightPositionAllowed() - && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0); - - if (status.getRightPosition() > initialRightPosition && - (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) { - // more than one matching result from right table; reset position in case of subsequent left match - status.setRightPosition(initialRightPosition); - } - status.advanceLeft(); - - if (status.isLeftRepeating() && status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) != 0) { - // left no longer has duplicates. switch back to incoming batch mode - status.setDefaultAdvanceMode(); - status.notifyLeftStoppedRepeating(); - } else if (status.isLeftRepeating() && crossedBatchBoundaries) { - try { - // build the right batches and - status.outputBatch.batchBuilder.build(); - status.setSV4AdvanceMode(); - } catch (SchemaChangeException e) { - status.ok = false; + // Move to next position in right iterator. + status.right.next(); + while (!status.right.finished()) { + if (doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition()) == 0) { + doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition()); + doCopyRight(status.right.getCurrentPosition(), status.getOutPosition()); + status.incOutputPos(); + if (status.isOutgoingBatchFull()) { + // Leave iterators at their current positions and markers. + // Don't mark on all subsequent doJoin iterations. + status.disableMarking(); + return true; + } + status.right.next(); + } else { + break; + } } - // return to indicate recompile in right-sv4 mode - return true; - } - - continue; - - case 1: - // left key > right key - status.advanceRight(); - continue; - - default: - throw new IllegalStateException(); + status.right.reset(); + // Enable marking only when we have consumed all equal keys on right side. + status.enableMarking(); + status.left.next(); + continue; + case 1: + // left key > right key + status.right.next(); + continue; + + default: + throw new IllegalStateException(); } } - return true; } // Generated Methods public abstract void doSetup(@Named("context") FragmentContext context, - @Named("status") JoinStatus status, - @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException; - + @Named("status") JoinStatus status, + @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException; /** * Copy the data to the new record batch (if it fits). @@ -241,15 +146,5 @@ public abstract class JoinTemplate implements JoinWorker { * 1 if left is > right */ protected abstract int doCompare(@Named("leftIndex") int leftIndex, - @Named("rightIndex") int rightIndex); - - - /** - * Compare the current left key to the next left key, if it's within the batch. - * @return 0 if both keys are equal, - * 1 if the keys are not equal, and - * -1 if there are no more keys in this batch - */ - protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex); - -} + @Named("rightIndex") int rightIndex); +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java index d39df8f17..2476a838a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java @@ -37,6 +37,7 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.resolver.TypeCastRules; import java.util.LinkedList; @@ -162,8 +163,8 @@ public class JoinUtils { * @param rightBatch right input record batch * @param context fragment context */ - public static void addLeastRestrictiveCasts(LogicalExpression[] leftExpressions, RecordBatch leftBatch, - LogicalExpression[] rightExpressions, RecordBatch rightBatch, + public static void addLeastRestrictiveCasts(LogicalExpression[] leftExpressions, VectorAccessible leftBatch, + LogicalExpression[] rightExpressions, VectorAccessible rightBatch, FragmentContext context) { assert rightExpressions.length == leftExpressions.length; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java index e4e13d1ec..55322f849 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -32,6 +32,7 @@ public interface JoinWorker { public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; public boolean doJoin(JoinStatus status); - public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class); + public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>( + JoinWorker.class, JoinTemplate.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 2e777f66b..96113e966 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; import java.io.IOException; import java.util.List; +import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -43,74 +44,62 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator; -import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordIterator; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; -import org.apache.calcite.rel.core.JoinRelType; import com.google.common.base.Preconditions; import com.sun.codemodel.JClass; import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; import com.sun.codemodel.JMod; -import com.sun.codemodel.JType; import com.sun.codemodel.JVar; /** - * A merge join combining to incoming in-order batches. + * A join operator merges two sorted streams using record iterator. */ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class); - public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; - public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; - public final MappingSet setupMapping = - new MappingSet("null", "null", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doSetup", null, null)); + new MappingSet("null", "null", + GM("doSetup", "doSetup", null, null), + GM("doSetup", "doSetup", null, null)); public final MappingSet copyLeftMapping = - new MappingSet("leftIndex", "outIndex", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doCopyLeft", null, null)); + new MappingSet("leftIndex", "outIndex", + GM("doSetup", "doSetup", null, null), + GM("doSetup", "doCopyLeft", null, null)); public final MappingSet copyRightMappping = - new MappingSet("rightIndex", "outIndex", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doCopyRight", null, null)); + new MappingSet("rightIndex", "outIndex", + GM("doSetup", "doSetup", null, null), + GM("doSetup", "doCopyRight", null, null)); public final MappingSet compareMapping = - new MappingSet("leftIndex", "rightIndex", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doCompare", null, null)); + new MappingSet("leftIndex", "rightIndex", + GM("doSetup", "doSetup", null, null), + GM("doSetup", "doCompare", null, null)); public final MappingSet compareRightMapping = - new MappingSet("rightIndex", "null", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doCompare", null, null)); - public final MappingSet compareLeftMapping = - new MappingSet("leftIndex", "null", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doCompareNextLeftKey", null, null)); - public final MappingSet compareNextLeftMapping = - new MappingSet("nextLeftIndex", "null", - GM("doSetup", "doSetup", null, null), - GM("doSetup", "doCompareNextLeftKey", null, null)); - + new MappingSet("rightIndex", "null", + GM("doSetup", "doSetup", null, null), + GM("doSetup", "doCompare", null, null)); private final RecordBatch left; private final RecordBatch right; + private final RecordIterator leftIterator; + private final RecordIterator rightIterator; private final JoinStatus status; private final List<JoinCondition> conditions; private final JoinRelType joinType; private JoinWorker worker; - public MergeJoinBatchBuilder batchBuilder; private boolean areNullsEqual = false; // whether nulls compare equal private static final String LEFT_INPUT = "LEFT INPUT"; @@ -123,10 +112,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); } this.left = left; + this.leftIterator = new RecordIterator(left, this, oContext, 0); this.right = right; + this.rightIterator = new RecordIterator(right, this, oContext, 1); this.joinType = popConfig.getJoinType(); - this.status = new JoinStatus(left, right, this); - this.batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status); + this.status = new JoinStatus(leftIterator, rightIterator, this); this.conditions = popConfig.getConditions(); JoinComparator comparator = JoinComparator.NONE; @@ -147,11 +137,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } @Override - public void buildSchema() throws SchemaChangeException { - status.ensureInitial(); + public void buildSchema() { + // initialize iterators + status.initialize(); - final IterOutcome leftOutcome = status.getLastLeft(); - final IterOutcome rightOutcome = status.getLastRight(); + final IterOutcome leftOutcome = status.getLeftStatus(); + final IterOutcome rightOutcome = status.getRightStatus(); if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) { state = BatchState.STOP; return; @@ -161,36 +152,38 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { state = BatchState.OUT_OF_MEMORY; return; } - allocateBatch(true); } @Override public IterOutcome innerNext() { // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. - status.ensureInitial(); - + status.prepare(); // loop so we can start over again if we find a new batch was created. while (true) { - - JoinOutcome outcome = status.getOutcome(); - // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. - if (outcome == JoinOutcome.SCHEMA_CHANGED) { - allocateBatch(true); - } else if (outcome == JoinOutcome.BATCH_RETURNED) { - allocateBatch(false); - } - - // reset the output position to zero after our parent iterates this RecordBatch - if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED || - outcome == JoinOutcome.NO_MORE_DATA) { - status.resetOutputPos(); - } - - if (outcome == JoinOutcome.NO_MORE_DATA) { - logger.debug("NO MORE DATA; returning {} NONE"); - return IterOutcome.NONE; + // Check result of last iteration. + switch (status.getOutcome()) { + case BATCH_RETURNED: + allocateBatch(false); + status.resetOutputPos(); + break; + case SCHEMA_CHANGED: + allocateBatch(true); + status.resetOutputPos(); + break; + case NO_MORE_DATA: + status.resetOutputPos(); + logger.debug("NO MORE DATA; returning {} NONE"); + return IterOutcome.NONE; + case FAILURE: + status.left.clearInflightBatches(); + status.right.clearInflightBatches(); + kill(false); + return IterOutcome.STOP; + case WAITING: + return IterOutcome.NOT_YET; + default: + throw new IllegalStateException(); } boolean first = false; @@ -214,36 +207,39 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { worker = null; } - // get the outcome of the join. + // get the outcome of the last join iteration. switch (status.getOutcome()) { - case BATCH_RETURNED: - // only return new schema if new worker has been setup. - logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); - setRecordCountInContainer(); - return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; - case FAILURE: - kill(false); - return IterOutcome.STOP; - case NO_MORE_DATA: - logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE"))); - setRecordCountInContainer(); - state = BatchState.DONE; - return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE); - case SCHEMA_CHANGED: - worker = null; - if (status.getOutPosition() > 0) { - // if we have current data, let's return that. - logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); + case BATCH_RETURNED: + // only return new schema if new worker has been setup. + logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); setRecordCountInContainer(); return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; - }else{ - // loop again to rebuild worker. - continue; - } - case WAITING: - return IterOutcome.NOT_YET; - default: - throw new IllegalStateException(); + case FAILURE: + status.left.clearInflightBatches(); + status.right.clearInflightBatches(); + kill(false); + return IterOutcome.STOP; + case NO_MORE_DATA: + logger.debug("NO MORE DATA; returning {}", + (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" : "NONE"))); + setRecordCountInContainer(); + state = BatchState.DONE; + return (first? IterOutcome.OK_NEW_SCHEMA : (status.getOutPosition() > 0 ? IterOutcome.OK: IterOutcome.NONE)); + case SCHEMA_CHANGED: + worker = null; + if (status.getOutPosition() > 0) { + // if we have current data, let's return that. + logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); + setRecordCountInContainer(); + return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + } else{ + // loop again to rebuild worker. + continue; + } + case WAITING: + return IterOutcome.NOT_YET; + default: + throw new IllegalStateException(); } } } @@ -255,13 +251,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } } - public void resetBatchBuilder() { - batchBuilder.close(); - batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status); - } - - public void addRightToBatchBuilder() { - batchBuilder.add(right); + @Override + public void close() { + super.close(); + leftIterator.close(); + rightIterator.close(); } @Override @@ -270,69 +264,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { right.kill(sendUpstream); } - private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, - LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, JVar joinStatus, - ErrorCollector collector) throws ClassTransformationException { - boolean nextLeftIndexDeclared = false; - - cg.setMappingSet(compareLeftMapping); - - for (int i = 0; i < leftExpression.length; i++) { - - // materialize value vector readers from join expression - final LogicalExpression materializedLeftExpr = leftExpression[i]; - - // generate compareNextLeftKey() - //////////////////////////////// - cg.setMappingSet(compareLeftMapping); - cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch)); - - if (!nextLeftIndexDeclared) { - // int nextLeftIndex = leftIndex + 1; - cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1))); - nextLeftIndexDeclared = true; - } - // check if the next key is in this batch - cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false))) - ._then() - ._return(JExpr.lit(-1)); - - // generate VV read expressions - ClassGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false); - cg.setMappingSet(compareNextLeftMapping); // change mapping from 'leftIndex' to 'nextLeftIndex' - ClassGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false); - - if (compareThisLeftExprHolder.isOptional()) { - // handle null == null - cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0)) - .cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0)))) - ._then() - ._return(JExpr.lit(0)); - - // handle null == !null - cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0)) - .cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0)))) - ._then() - ._return(JExpr.lit(1)); - } - - // check value equality - - LogicalExpression gh = - FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareThisLeftExprHolder, - compareNextLeftExprHolder, - context.getFunctionRegistry()); - HoldingContainer out = cg.addExpr(gh, false); - - // If not 0, it means not equal. We return this out value. - JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - jc._then()._return(out.getValue()); - } - - //Pass the equality check for all the join conditions. Finally, return 0. - cg.getEvalBlock()._return(JExpr.lit(0)); - } - private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{ final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry()); @@ -353,7 +284,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { cg.getSetupBlock().assign(JExpr._this().ref(outgoingVectorContainer), JExpr.direct("outgoing")); // declare and assign incoming left RecordBatch member - JClass recordBatchClass = cg.getModel().ref(RecordBatch.class); + JClass recordBatchClass = cg.getModel().ref(RecordIterator.class); JVar incomingLeftRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingLeft"); cg.getSetupBlock().assign(JExpr._this().ref(incomingLeftRecordBatch), joinStatus.ref("left")); @@ -371,34 +302,30 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { */ LogicalExpression leftExpr[] = new LogicalExpression[conditions.size()]; LogicalExpression rightExpr[] = new LogicalExpression[conditions.size()]; - IterOutcome lastLeftStatus = status.getLastLeft(); - IterOutcome lastRightStatus = status.getLastRight(); + IterOutcome lastLeftStatus = status.getLeftStatus(); + IterOutcome lastRightStatus = status.getRightStatus(); for (int i = 0; i < conditions.size(); i++) { JoinCondition condition = conditions.get(i); - leftExpr[i] = materializeExpression(condition.getLeft(), lastLeftStatus, left, collector); - rightExpr[i] = materializeExpression(condition.getRight(), lastRightStatus, right, collector); + leftExpr[i] = materializeExpression(condition.getLeft(), lastLeftStatus, leftIterator, collector); + rightExpr[i] = materializeExpression(condition.getRight(), lastRightStatus, rightIterator, collector); } // if right side is empty, rightExpr will most likely default to NULLABLE INT which may cause the following // call to throw an exception. In this case we can safely skip adding the casts if (lastRightStatus != IterOutcome.NONE) { - JoinUtils.addLeastRestrictiveCasts(leftExpr, left, rightExpr, right, context); + JoinUtils.addLeastRestrictiveCasts(leftExpr, leftIterator, rightExpr, rightIterator, context); } //generate doCompare() method ///////////////////////////////////////// generateDoCompare(cg, incomingRecordBatch, leftExpr, incomingLeftRecordBatch, rightExpr, - incomingRightRecordBatch, collector); - - //generate doCompareNextLeftKey() method - ///////////////////////////////////////// - generateDoCompareNextLeft(cg, incomingRecordBatch, leftExpr, incomingLeftRecordBatch, joinStatus, collector); + incomingRightRecordBatch, collector); // generate copyLeft() ////////////////////// cg.setMappingSet(copyLeftMapping); int vectorId = 0; - if (worker == null || status.isLeftPositionAllowed()) { - for (VectorWrapper<?> vw : left) { + if (worker == null || !status.left.finished()) { + for (VectorWrapper<?> vw : leftIterator) { MajorType inputType = vw.getField().getType(); MajorType outputType; if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { @@ -406,15 +333,16 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { outputType = inputType; } + // TODO (DRILL-4011): Factor out CopyUtil and use it here. JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft", - new TypedFieldId(inputType, vectorId)); + new TypedFieldId(inputType, vectorId)); JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(outputType,vectorId)); + new TypedFieldId(outputType,vectorId)); // todo: check result of copyFromSafe and grow allocation cg.getEvalBlock().add(vvOut.invoke("copyFromSafe") - .arg(copyLeftMapping.getValueReadIndex()) - .arg(copyLeftMapping.getValueWriteIndex()) - .arg(vvIn)); + .arg(copyLeftMapping.getValueReadIndex()) + .arg(copyLeftMapping.getValueWriteIndex()) + .arg(vvIn)); ++vectorId; } } @@ -424,8 +352,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { cg.setMappingSet(copyRightMappping); int rightVectorBase = vectorId; - if (status.getLastRight() != IterOutcome.NONE && (worker == null || status.isRightPositionAllowed())) { - for (VectorWrapper<?> vw : right) { + if (status.getRightStatus() != IterOutcome.NONE && (worker == null || !status.right.finished())) { + for (VectorWrapper<?> vw : rightIterator) { MajorType inputType = vw.getField().getType(); MajorType outputType; if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { @@ -433,15 +361,16 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { outputType = inputType; } + // TODO (DRILL-4011): Factor out CopyUtil and use it here. JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight", - new TypedFieldId(inputType, vectorId - rightVectorBase)); + new TypedFieldId(inputType, vectorId - rightVectorBase)); JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing", - new TypedFieldId(outputType,vectorId)); + new TypedFieldId(outputType,vectorId)); // todo: check result of copyFromSafe and grow allocation cg.getEvalBlock().add(vvOut.invoke("copyFromSafe") - .arg(copyRightMappping.getValueReadIndex()) - .arg(copyRightMappping.getValueWriteIndex()) - .arg(vvIn)); + .arg(copyRightMappping.getValueReadIndex()) + .arg(copyRightMappping.getValueWriteIndex()) + .arg(vvIn)); ++vectorId; } } @@ -455,13 +384,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // allocate new batch space. container.zeroVectors(); - boolean leftAllowed = status.getLastLeft() != IterOutcome.NONE; - boolean rightAllowed = status.getLastRight() != IterOutcome.NONE; + boolean leftAllowed = status.getLeftStatus() != IterOutcome.NONE; + boolean rightAllowed = status.getRightStatus() != IterOutcome.NONE; if (newSchema) { - // add fields from both batches + // add fields from both batches if (leftAllowed) { - for (VectorWrapper<?> w : left) { + for (VectorWrapper<?> w : leftIterator) { MajorType inputType = w.getField().getType(); MajorType outputType; if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) { @@ -477,9 +406,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } } } - if (rightAllowed) { - for (VectorWrapper<?> w : right) { + for (VectorWrapper<?> w : rightIterator) { MajorType inputType = w.getField().getType(); MajorType outputType; if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { @@ -496,7 +424,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } } } - for (VectorWrapper w : container) { AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE); } @@ -506,16 +433,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, - LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression, - JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException { + LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression, + JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException { cg.setMappingSet(compareMapping); - if (status.getLastRight() != IterOutcome.NONE) { + if (status.getRightStatus() != IterOutcome.NONE) { assert leftExpression.length == rightExpression.length; for (int i = 0; i < leftExpression.length; i++) { - - // generate compare() //////////////////////// cg.setMappingSet(compareMapping); @@ -527,17 +452,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(rightExpression[i], false); LogicalExpression fh = - FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareLeftExprHolder, - compareRightExprHolder, - context.getFunctionRegistry()); + FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareLeftExprHolder, + compareRightExprHolder, + context.getFunctionRegistry()); HoldingContainer out = cg.addExpr(fh, false); // If not 0, it means not equal. // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional() - && ! areNullsEqual) { + && ! areNullsEqual) { JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)). - cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); + cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); jc._then()._return(JExpr.lit(1)); jc._elseif(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue()); } else { @@ -551,7 +476,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } private LogicalExpression materializeExpression(LogicalExpression expression, IterOutcome lastStatus, - RecordBatch input, ErrorCollector collector) throws ClassTransformationException { + VectorAccessible input, ErrorCollector collector) throws ClassTransformationException { LogicalExpression materializedExpr = null; if (lastStatus != IterOutcome.NONE) { materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry()); @@ -560,8 +485,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } if (collector.hasErrors()) { throw new ClassTransformationException(String.format( - "Failure while trying to materialize incoming field from %s batch. Errors:\n %s.", - (input == left ? LEFT_INPUT : RIGHT_INPUT), collector.toErrorString())); + "Failure while trying to materialize incoming field from %s batch. Errors:\n %s.", + (input == leftIterator ? LEFT_INPUT : RIGHT_INPUT), collector.toErrorString())); } return materializedExpr; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java deleted file mode 100644 index 279801022..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.join; - -import io.netty.buffer.DrillBuf; - -import java.util.List; - -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.ValueVector; - -import com.google.common.collect.ArrayListMultimap; - -public class MergeJoinBatchBuilder implements AutoCloseable { - - private final ArrayListMultimap<BatchSchema, RecordBatchData> queuedRightBatches = ArrayListMultimap.create(); - private VectorContainer container; - private int runningBytes; - private int runningBatches; - private int recordCount; - private PreAllocator svAllocator; - private boolean svAllocatorUsed = false; - private JoinStatus status; - - public MergeJoinBatchBuilder(BufferAllocator allocator, JoinStatus status) { - this.container = new VectorContainer(); - this.status = status; - this.svAllocator = allocator.getNewPreAllocator(); - } - - public boolean add(RecordBatch batch) { - if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) { - throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch."); - } - if (batch.getRecordCount() == 0) { - return true; // skip over empty record batches. - } - - // resource checks - long batchBytes = getSize(batch); - if (batchBytes + runningBytes > Integer.MAX_VALUE) { - return false; // TODO: 2GB is arbitrary - } - if (runningBatches++ >= Character.MAX_VALUE) { - return false; // allowed in batch. - } - if (!svAllocator.preAllocate(batch.getRecordCount()*4)) { - return false; // sv allocation available. - } - - // transfer VVs to a new RecordBatchData - RecordBatchData bd = new RecordBatchData(batch); - runningBytes += batchBytes; - queuedRightBatches.put(batch.getSchema(), bd); - recordCount += bd.getRecordCount(); - return true; - } - - private long getSize(RecordBatch batch) { - long bytes = 0; - for (VectorWrapper<?> v : batch) { - bytes += v.getValueVector().getBufferSize(); - } - return bytes; - } - - public void build() throws SchemaChangeException { - container.clear(); - if (queuedRightBatches.size() > Character.MAX_VALUE) { - throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); - } - final DrillBuf drillBuf = svAllocator.getAllocation(); - svAllocatorUsed = true; - status.sv4 = new SelectionVector4(drillBuf, recordCount, Character.MAX_VALUE); - BatchSchema schema = queuedRightBatches.keySet().iterator().next(); - List<RecordBatchData> data = queuedRightBatches.get(schema); - - // now we're going to generate the sv4 pointers - switch (schema.getSelectionVectorMode()) { - case NONE: { - int index = 0; - int recordBatchId = 0; - for (RecordBatchData d : data) { - for (int i =0; i < d.getRecordCount(); i++, index++) { - status.sv4.set(index, recordBatchId, i); - } - recordBatchId++; - } - break; - } - case TWO_BYTE: { - int index = 0; - int recordBatchId = 0; - for (RecordBatchData d : data) { - for (int i =0; i < d.getRecordCount(); i++, index++) { - status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); - } - // might as well drop the selection vector since we'll stop using it now. - d.getSv2().clear(); - recordBatchId++; - } - break; - } - default: - throw new UnsupportedOperationException(); - } - - // next, we'll create lists of each of the vector types. - ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create(); - for (RecordBatchData rbd : queuedRightBatches.values()) { - for (ValueVector v : rbd.getVectors()) { - vectors.put(v.getField(), v); - } - } - - for (MaterializedField f : vectors.keySet()) { - List<ValueVector> v = vectors.get(f); - container.addHyperList(v); - } - - container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); - } - - @Override - public void close() { - if (!svAllocatorUsed) { - final DrillBuf drillBuf = svAllocator.getAllocation(); - if (drillBuf != null) { - drillBuf.release(); - } - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index f68bed156..6bfb483e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -159,4 +159,19 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< vectors = (T[]) ArrayUtils.add(vectors, vv); } + /** + * Transfer vectors to destination HyperVectorWrapper. + * Both this and destination must be of same type and have same number of vectors. + * @param destination destination HyperVectorWrapper. + */ + public void transfer(VectorWrapper<?> destination) { + Preconditions.checkArgument(destination instanceof HyperVectorWrapper); + Preconditions.checkArgument(getField().getType().equals(destination.getField().getType())); + Preconditions.checkArgument(vectors.length == ((HyperVectorWrapper)destination).vectors.length); + + ValueVector[] destionationVectors = ((HyperVectorWrapper)destination).vectors; + for (int i = 0; i < vectors.length; ++i) { + vectors[i].makeTransferPair(destionationVectors[i]).transfer(); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java new file mode 100644 index 000000000..faa4d830d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeRangeMap; + +/** + * RecordIterator iterates over incoming record batches one record at a time. + * It allows to mark a position during iteration and reset back. + * RecordIterator will hold onto multiple record batches in order to support resetting beyond record batch boundary. + */ +public class RecordIterator implements VectorAccessible { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordIterator.class); + + private final RecordBatch incoming; + private final AbstractRecordBatch<?> outgoing; + private long outerPosition; // Tracks total records consumed so far, works across batches. + private int innerPosition; // Index within current vector container. + private int innerRecordCount; // Records in current container. + private long totalRecordCount; // Total records read so far. + private long startBatchPosition; // Start offset of current batch. + private int markedInnerPosition; + private long markedOuterPosition; + private IterOutcome lastOutcome; + private int inputIndex; // For two way merge join 0:left, 1:right + private boolean lastBatchRead; // True if all batches are consumed. + private boolean initialized; + + private final VectorContainer container; // Holds VectorContainer of current record batch + private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create(); + + public RecordIterator(RecordBatch incoming, + AbstractRecordBatch<?> outgoing, + OperatorContext oContext, + int inputIndex) { + this.incoming = incoming; + this.outgoing = outgoing; + this.inputIndex = inputIndex; + this.lastBatchRead = false; + this.container = new VectorContainer(oContext); + resetIndices(); + this.initialized = false; + } + + private void resetIndices() { + this.innerPosition = -1; + this.startBatchPosition = -1; + this.outerPosition = -1; + this.totalRecordCount = 0; + this.innerRecordCount = 0; + this.markedInnerPosition = -1; + this.markedOuterPosition = -1; + } + + // Get next record batch. + private void nextBatch() { + // We have already seen last batch. + if (lastBatchRead) { + return; + } + lastOutcome = outgoing.next(inputIndex, incoming); + } + + public void mark() { + // Release all batches before current batch. [0 to startBatchPosition). + final Map<Range<Long>,RecordBatchData> oldBatches = batches.subRangeMap(Range.closedOpen(0l, startBatchPosition)).asMapOfRanges(); + for (Range<Long> range : oldBatches.keySet()) { + oldBatches.get(range.lowerEndpoint()).clear(); + } + batches.remove(Range.closedOpen(0l, startBatchPosition)); + markedInnerPosition = innerPosition; + markedOuterPosition = outerPosition; + } + + public void reset() { + if (markedOuterPosition >= 0) { + // Move to rbd for markedOuterPosition. + final RecordBatchData rbdNew = batches.get(markedOuterPosition); + final RecordBatchData rbdOld = batches.get(startBatchPosition); + Preconditions.checkArgument(rbdOld != null); + Preconditions.checkArgument(rbdNew != null); + if (rbdNew != rbdOld) { + container.transferOut(rbdOld.getContainer()); + container.transferIn(rbdNew.getContainer()); + } + innerPosition = markedInnerPosition; + outerPosition = markedOuterPosition; + startBatchPosition = batches.getEntry(outerPosition).getKey().lowerEndpoint(); + innerRecordCount = (int)(batches.getEntry(outerPosition).getKey().upperEndpoint() - startBatchPosition); + markedInnerPosition = -1; + markedOuterPosition = -1; + } + } + + // Move forward by delta (may cross one or more record batches) + public void forward(long delta) { + Preconditions.checkArgument(delta >= 0); + Preconditions.checkArgument(delta + outerPosition < totalRecordCount); + final long nextOuterPosition = delta + outerPosition; + final RecordBatchData rbdNew = batches.get(nextOuterPosition); + final RecordBatchData rbdOld = batches.get(outerPosition); + Preconditions.checkArgument(rbdNew != null); + Preconditions.checkArgument(rbdOld != null); + container.transferOut(rbdOld.getContainer()); + // Get vectors from new position. + container.transferIn(rbdNew.getContainer()); + outerPosition = nextOuterPosition; + startBatchPosition = batches.getEntry(outerPosition).getKey().lowerEndpoint(); + innerPosition = (int)(outerPosition - startBatchPosition); + innerRecordCount = (int)(batches.getEntry(outerPosition).getKey().upperEndpoint() - startBatchPosition); + } + + /** + * buildSchema calls next() in order to read schema quikcly. + * Make sure we have fetched next non-empty batch at the end of the prepare. + * After prepare position of iterator is at 0. + */ + public void prepare() { + while (!lastBatchRead && outerPosition == -1) { + next(); + } + } + + /** + * Move iterator to next record. + * @return + * Status of current record batch read. + */ + public IterOutcome next() { + if (finished()) { + return lastOutcome; + } + long nextOuterPosition = outerPosition + 1; + final int nextInnerPosition = innerPosition + 1; + if (!initialized || nextOuterPosition >= totalRecordCount) { + nextBatch(); + switch (lastOutcome) { + case NONE: + case STOP: + // No more data, disallow reads unless reset is called. + outerPosition = nextOuterPosition; + lastBatchRead = true; + break; + case OK_NEW_SCHEMA: + case OK: + // If Schema changes in the middle of the execution clear out data. + if (initialized && lastOutcome == IterOutcome.OK_NEW_SCHEMA) { + clear(); + resetIndices(); + initialized = false; + nextOuterPosition = 0; + } + // Transfer vectors from incoming record batch. + final RecordBatchData rbd = new RecordBatchData(incoming); + innerRecordCount = incoming.getRecordCount(); + if (!initialized) { + for (VectorWrapper<?> w : rbd.getContainer()) { + container.addOrGet(w.getField()); + } + container.buildSchema(rbd.getContainer().getSchema().getSelectionVectorMode()); + initialized = true; + } + if (innerRecordCount > 0) { + // Transfer vectors back to old batch. + if (startBatchPosition != -1 && batches.get(startBatchPosition) != null) { + container.transferOut(batches.get(outerPosition).getContainer()); + } + container.transferIn(rbd.getContainer()); + startBatchPosition = nextOuterPosition; + batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition + innerRecordCount), rbd); + innerPosition = 0; + outerPosition = nextOuterPosition; + totalRecordCount += innerRecordCount; + } else { + // Release schema/empty batches. + rbd.clear(); + } + break; + case OUT_OF_MEMORY: + return lastOutcome; + case NOT_YET: + default: + throw new UnsupportedOperationException("Unsupported outcome received " + lastOutcome); + } + } else { + outerPosition = nextOuterPosition; + innerPosition = nextInnerPosition; + } + return lastOutcome; + } + + public boolean finished() { + return lastBatchRead && outerPosition >= totalRecordCount; + } + + public IterOutcome getLastOutcome() { + return lastOutcome; + } + + public long getTotalRecordCount() { + return totalRecordCount; + } + + public int getInnerRecordCount() { + return innerRecordCount; + } + + public long getOuterPosition() { + return outerPosition; + } + + public int getCurrentPosition() { + Preconditions.checkArgument(initialized); + Preconditions.checkArgument(innerPosition >= 0 && innerPosition < innerRecordCount); + return innerPosition; + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + Preconditions.checkArgument(initialized); + return container.getValueAccessorById(clazz, ids); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + Preconditions.checkArgument(initialized); + return container.getValueVectorId(path); + } + + @Override + public BatchSchema getSchema() { + Preconditions.checkArgument(initialized); + return container.getSchema(); + } + + @Override + public int getRecordCount() { + Preconditions.checkArgument(initialized); + return innerRecordCount; + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + Preconditions.checkArgument(initialized); + return container.iterator(); + } + + // Release all vectors held by record batches, clear out range map. + public void clear() { + if (container != null) { + container.clear(); + } + for (RecordBatchData d : batches.asMapOfRanges().values()) { + d.clear(); + } + batches.clear(); + } + + // Deplete incoming batches. + public void clearInflightBatches() { + while (lastOutcome == IterOutcome.OK || lastOutcome == IterOutcome.OK_NEW_SCHEMA) { + // Clear all buffers from incoming. + for (VectorWrapper<?> wrapper : incoming) { + wrapper.getValueVector().clear(); + } + lastOutcome = incoming.next(); + } + } + + public void close() { + clear(); + clearInflightBatches(); + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index 85ec4667a..f767e74ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -33,32 +33,32 @@ import org.apache.drill.exec.vector.complex.ListVector; import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.UnionVector; -import java.util.ArrayList; import java.util.List; +import com.google.common.base.Preconditions; public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class); - private T v; + private T vector; public SimpleVectorWrapper(T v) { - this.v = v; + this.vector = v; } @SuppressWarnings("unchecked") @Override public Class<T> getVectorClass() { - return (Class<T>) v.getClass(); + return (Class<T>) vector.getClass(); } @Override public MaterializedField getField() { - return v.getField(); + return vector.getField(); } @Override public T getValueVector() { - return v; + return vector; } @Override @@ -74,14 +74,14 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper @SuppressWarnings("unchecked") @Override public VectorWrapper<T> cloneAndTransfer() { - TransferPair tp = v.getTransferPair(); + TransferPair tp = vector.getTransferPair(); tp.transfer(); return new SimpleVectorWrapper<T>((T) tp.getTo()); } @Override public void clear() { - v.clear(); + vector.clear(); } public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v) { @@ -95,7 +95,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper return this; } - ValueVector vector = v; + ValueVector vector = this.vector; for (int i = 1; i < ids.length; i++) { final AbstractMapVector mapLike = AbstractMapVector.class.cast(vector); if (mapLike == null) { @@ -109,15 +109,15 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper @Override public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { - if (!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) { + if (!expectedPath.getRootSegment().segmentEquals(vector.getField().getPath().getRootSegment())) { return null; } PathSegment seg = expectedPath.getRootSegment(); - if (v instanceof UnionVector) { + if (vector instanceof UnionVector) { TypedFieldId.Builder builder = TypedFieldId.newBuilder(); builder.addId(id).remainder(expectedPath.getRootSegment().getChild()); - List<MinorType> minorTypes = ((UnionVector) v).getSubTypes(); + List<MinorType> minorTypes = ((UnionVector) vector).getSubTypes(); MajorType.Builder majorTypeBuilder = MajorType.newBuilder().setMinorType(MinorType.UNION); for (MinorType type : minorTypes) { majorTypeBuilder.addSubType(type); @@ -128,28 +128,28 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper builder.finalType(majorType); return builder.build(); } else { - return ((UnionVector) v).getFieldIdIfMatches(builder, false, seg.getChild()); + return ((UnionVector) vector).getFieldIdIfMatches(builder, false, seg.getChild()); } - } else if (v instanceof ListVector) { - ListVector list = (ListVector) v; + } else if (vector instanceof ListVector) { + ListVector list = (ListVector) vector; TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(v.getField().getType()); + builder.intermediateType(vector.getField().getType()); builder.addId(id); return list.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); } else - if (v instanceof AbstractContainerVector) { + if (vector instanceof AbstractContainerVector) { // we're looking for a multi path. - AbstractContainerVector c = (AbstractContainerVector) v; + AbstractContainerVector c = (AbstractContainerVector) vector; TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(v.getField().getType()); + builder.intermediateType(vector.getField().getType()); builder.addId(id); return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); } else { TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(v.getField().getType()); + builder.intermediateType(vector.getField().getType()); builder.addId(id); - builder.finalType(v.getField().getType()); + builder.finalType(vector.getField().getType()); if (seg.isLastPath()) { return builder.build(); } else { @@ -157,7 +157,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper if (child.isArray() && child.isLastPath()) { builder.remainder(child); builder.withIndex(); - builder.finalType(v.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build()); + builder.finalType(vector.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build()); return builder.build(); } else { return null; @@ -167,4 +167,10 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper } } + public void transfer(VectorWrapper<?> destination) { + Preconditions.checkArgument(destination instanceof SimpleVectorWrapper); + Preconditions.checkArgument(getField().getType().equals(destination.getField().getType())); + vector.makeTransferPair(((SimpleVectorWrapper)destination).vector).transfer(); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 57278247f..ef22d521d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -29,6 +29,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; @@ -80,6 +81,25 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess add(vv, releasable); } + /** + * Transfer vectors from containerIn to this. + */ + void transferIn(VectorContainer containerIn) { + Preconditions.checkArgument(this.wrappers.size() == containerIn.wrappers.size()); + for (int i = 0; i < this.wrappers.size(); ++i) { + containerIn.wrappers.get(i).transfer(this.wrappers.get(i)); + } + } + + /** + * Transfer vectors from this to containerOut + */ + void transferOut(VectorContainer containerOut) { + Preconditions.checkArgument(this.wrappers.size() == containerOut.wrappers.size()); + for (int i = 0; i < this.wrappers.size(); ++i) { + this.wrappers.get(i).transfer(containerOut.wrappers.get(i)); + } + } public <T extends ValueVector> T addOrGet(MaterializedField field) { return addOrGet(field, null); @@ -97,6 +117,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess return (T) newVector; } } else { + vector = TypeHelper.getNewVector(field, this.oContext.getAllocator(), callBack); add(vector); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index dc8ffe582..5250f98a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -33,6 +33,7 @@ public interface VectorWrapper<T extends ValueVector> { public void clear(); public VectorWrapper<T> cloneAndTransfer(); public VectorWrapper<?> getChildWrapper(int[] ids); + public void transfer(VectorWrapper<?> destination); /** * Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper. If so, return a TypedFieldId associated with this path. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java index 55173f93a..40ee63d42 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java @@ -40,21 +40,22 @@ public class BatchPrinter { numBatches = vw.getValueVectors().length; } int width = columns.size(); - for (int j = 0; j < sv4.getCount(); j++) { - for (VectorWrapper vw : batch) { - Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(j & 65535); - if (o instanceof byte[]) { - String value = new String((byte[]) o); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); - } else { - String value = o.toString(); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); - } - } - System.out.printf("|\n"); + for (int j = 0; j < sv4.getCount(); j++) { + for (VectorWrapper vw : batch) { + Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); } + } System.out.printf("|\n"); + } + System.out.printf("|\n"); } + public static void printBatch(VectorAccessible batch) { List<String> columns = Lists.newArrayList(); List<ValueVector> vectors = Lists.newArrayList(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 4b155ee2b..0b52b9e36 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -184,10 +184,10 @@ public class TestExampleQueries extends BaseTestQuery { @Test public void testPushExpInJoinConditionInnerJoin() throws Exception { test("select a.n_nationkey from cp.`tpch/nation.parquet` a join cp.`tpch/region.parquet` b " + "" + - " on a.n_regionkey + 100 = b.r_regionkey + 200" + // expressions in both sides of equal join filter - " and (substr(a.n_name,1,3)= 'L1' or substr(a.n_name,2,2) = 'L2') " + // left filter - " and (substr(b.r_name,1,3)= 'R1' or substr(b.r_name,2,2) = 'R2') " + // right filter - " and (substr(a.n_name,2,3)= 'L3' or substr(b.r_name,3,2) = 'R3');"); // non-equal join filter + " on a.n_regionkey + 100 = b.r_regionkey + 200" + // expressions in both sides of equal join filter + " and (substr(a.n_name,1,3)= 'L1' or substr(a.n_name,2,2) = 'L2') " + // left filter + " and (substr(b.r_name,1,3)= 'R1' or substr(b.r_name,2,2) = 'R2') " + // right filter + " and (substr(a.n_name,2,3)= 'L3' or substr(b.r_name,3,2) = 'R3');"); // non-equal join filter } @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java index 54539fd2e..ec7411d10 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java @@ -23,8 +23,15 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Random; + public class TestMergeJoinAdvanced extends BaseTestQuery { // Have to disable hash join to test merge join in this class @@ -96,4 +103,99 @@ public class TestMergeJoinAdvanced extends BaseTestQuery { setSessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, String.valueOf(ExecConstants.MAX_WIDTH_PER_NODE.getDefault().num_val)); } } + + private static void generateData(final BufferedWriter leftWriter, final BufferedWriter rightWriter, + final long left, final long right) throws IOException { + for (int i=0; i < left; ++i) { + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10000, i)); + } + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10001, 10001)); + leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10002, 10002)); + + for (int i=0; i < right; ++i) { + rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10000, i)); + } + rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10004, 10004)); + rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10005, 10005)); + rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10006, 10006)); + + leftWriter.close(); + rightWriter.close(); + } + + private static void testMultipleBatchJoin(final long right, final long left, + final String joinType, final long expected) throws Exception { + final String leftSide = BaseTestQuery.getTempDir("merge-join-left.json"); + final String rightSide = BaseTestQuery.getTempDir("merge-join-right.json"); + final BufferedWriter leftWriter = new BufferedWriter(new FileWriter(new File(leftSide))); + final BufferedWriter rightWriter = new BufferedWriter(new FileWriter(new File(rightSide))); + generateData(leftWriter, rightWriter, left, right); + final String query1 = String.format("select count(*) c1 from dfs_test.`%s` L %s join dfs_test.`%s` R on L.k=R.k1", + leftSide, joinType, rightSide); + testBuilder() + .sqlQuery(query1) + .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false") + .unOrdered() + .baselineColumns("c1") + .baselineValues(expected) + .go(); + } + + @Test + public void testMergeInnerJoinLargeRight() throws Exception { + testMultipleBatchJoin(1000l, 5000l, "inner", 5000l * 1000l); + } + + @Test + public void testMergeLeftJoinLargeRight() throws Exception { + testMultipleBatchJoin(1000l, 5000l, "left", 5000l * 1000l +2l); + } + + @Test + public void testMergeRightJoinLargeRight() throws Exception { + testMultipleBatchJoin(1000l, 5000l, "right", 5000l*1000l +3l); + } + + @Test + public void testMergeInnerJoinLargeLeft() throws Exception { + testMultipleBatchJoin(5000l, 1000l, "inner", 5000l*1000l); + } + + @Test + public void testMergeLeftJoinLargeLeft() throws Exception { + testMultipleBatchJoin(5000l, 1000l, "left", 5000l*1000l + 2l); + } + + @Test + public void testMergeRightJoinLargeLeft() throws Exception { + testMultipleBatchJoin(5000l, 1000l, "right", 5000l*1000l + 3l); + } + + // Following tests can take some time. + @Test + @Ignore + public void testMergeInnerJoinRandomized() throws Exception { + final Random r = new Random(); + final long right = r.nextInt(10001) + 1l; + final long left = r.nextInt(10001) + 1l; + testMultipleBatchJoin(left, right, "inner", left*right); + } + + @Test + @Ignore + public void testMergeLeftJoinRandomized() throws Exception { + final Random r = new Random(); + final long right = r.nextInt(10001) + 1l; + final long left = r.nextInt(10001) + 1l; + testMultipleBatchJoin(left, right, "left", left*right + 2l); + } + + @Test + @Ignore + public void testMergeRightJoinRandomized() throws Exception { + final Random r = new Random(); + final long right = r.nextInt(10001) + 1l; + final long left = r.nextInt(10001) + 1l; + testMultipleBatchJoin(left, right, "right", left * right + 3l); + } } |