aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorAmit Hadke <amit.hadke@gmail.com>2015-10-07 18:23:21 -0700
committerJacques Nadeau <jacques@apache.org>2015-11-04 20:55:54 -0800
commit2bc16a90b4883b2b2e3213f5e7a46ff1ea78bd98 (patch)
treea74cf223fcd8b9ff117eb24e5b0657f6899ec445 /exec
parent39582bd60c9eaaaa9b16aba4f099d434e927e7e5 (diff)
DRILL-3793: New MergeJoin and add RecordIterator interface
This closes #190
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java279
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java229
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java329
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java157
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java302
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java25
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java8
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java102
14 files changed, 730 insertions, 796 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index 8f72d327d..f7154f822 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -18,177 +18,82 @@
package org.apache.drill.exec.physical.impl.join;
import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
-import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.record.RecordIterator;
import org.apache.calcite.rel.core.JoinRelType;
/**
- * The status of the current join. Maintained outside the individually compiled join templates so that we can carry status across multiple schemas.
+ * Maintain join state.
*/
public final class JoinStatus {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);
private static final int OUTPUT_BATCH_SIZE = 32*1024;
- public static enum RightSourceMode {
- INCOMING, SV4;
- }
-
- private static enum InitState {
- INIT, // initial state
- CHECK, // need to check if batches are empty
- READY // read to do work
- }
-
- private static final int LEFT_INPUT = 0;
- private static final int RIGHT_INPUT = 1;
-
- public final RecordBatch left;
- private int leftPosition;
- private IterOutcome lastLeft;
-
- public final RecordBatch right;
- private int rightPosition;
- private int svRightPosition;
- private IterOutcome lastRight;
+ public final RecordIterator left;
+ public final RecordIterator right;
+ private boolean iteratorInitialized;
private int outputPosition;
- public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
public MergeJoinBatch outputBatch;
- public SelectionVector4 sv4;
-
- private boolean hasIntermediateData;
- private int initialRightPosition = -1;
- private boolean crossedBatchBoundaries;
private final JoinRelType joinType;
+ private boolean allowMarking;
public boolean ok = true;
- private InitState initialSet = InitState.INIT;
- private boolean leftRepeating = false;
- public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) {
- super();
+ public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch output) {
this.left = left;
this.right = right;
this.outputBatch = output;
this.joinType = output.getJoinType();
+ this.iteratorInitialized = false;
+ this.allowMarking = true;
}
@Override
public String toString() {
return
- super.toString()
+ super.toString()
+ "["
- + "leftPosition = " + leftPosition
- + ", rightPosition = " + rightPosition
- + ", svRightPosition = " + svRightPosition
+ + "leftPosition = " + left.getCurrentPosition()
+ + ", rightPosition = " + right.getCurrentPosition()
+ ", outputPosition = " + outputPosition
- + ", lastLeft = " + lastLeft
- + ", lastRight = " + lastRight
- + ", rightSourceMode = " + rightSourceMode
- + ", sv4 = " + sv4
+ ", joinType = " + joinType
+ ", ok = " + ok
- + ", initialSet = " + initialSet
- + ", leftRepeating = " + leftRepeating
+ + ", initialSet = " + iteratorInitialized
+ ", left = " + left
+ ", right = " + right
+ ", outputBatch = " + outputBatch
+ "]";
}
- public boolean hasIntermediateData() {
- return hasIntermediateData;
- }
-
- public void resetIntermediateData() {
- hasIntermediateData = false;
- }
-
- public void setIntermediateData(int initialRightPosition, boolean crossedBatchBoundaries) {
- this.initialRightPosition = initialRightPosition;
- this.crossedBatchBoundaries = crossedBatchBoundaries;
- this.hasIntermediateData = true;
- }
-
- public int getInitialRightPosition() {
- return initialRightPosition;
- }
-
- public boolean getCrossedBatchBoundaries() {
- return crossedBatchBoundaries;
- }
-
- private final IterOutcome nextLeft() {
- return outputBatch.next(LEFT_INPUT, left);
- }
-
- private final IterOutcome nextRight() {
- return outputBatch.next(RIGHT_INPUT, right);
- }
-
- public final void ensureInitial() {
- switch(initialSet) {
- case INIT:
- this.lastLeft = nextLeft();
- this.lastRight = nextRight();
- initialSet = InitState.CHECK;
- break;
- case CHECK:
- if (lastLeft != IterOutcome.NONE && left.getRecordCount() == 0) {
- this.lastLeft = nextLeft();
- }
- if (lastRight != IterOutcome.NONE && right.getRecordCount() == 0) {
- this.lastRight = nextRight();
- }
- initialSet = InitState.READY;
- // fall through
- default:
- break;
+ // Initialize left and right record iterator. We avoid doing this in constructor.
+ // Callers must check state of each iterator after calling ensureInitial.
+ public void initialize() {
+ if (!iteratorInitialized) {
+ left.next();
+ right.next();
+ iteratorInitialized = true;
}
}
- public final void advanceLeft() {
- leftPosition++;
- }
-
- public final void advanceRight() {
- if (rightSourceMode == RightSourceMode.INCOMING) {
- rightPosition++;
- } else {
- svRightPosition++;
+ public void prepare() {
+ if (!iteratorInitialized) {
+ initialize();
}
+ left.prepare();
+ right.prepare();
}
- public final int getLeftPosition() {
- return leftPosition;
- }
-
- public final int getRightPosition() {
- return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
- }
-
- public final int getRightCount() {
- return right.getRecordCount();
- }
-
- public final void setRightPosition(int pos) {
- rightPosition = pos;
- }
+ public IterOutcome getLeftStatus() { return left.getLastOutcome(); }
+ public IterOutcome getRightStatus() { return right.getLastOutcome(); }
public final int getOutPosition() {
return outputPosition;
}
- public final int fetchAndIncOutputPos() {
- return outputPosition++;
- }
-
public final void resetOutputPos() {
outputPosition = 0;
}
@@ -198,145 +103,61 @@ public final class JoinStatus {
}
public final void incOutputPos() {
- outputPosition++;
- }
-
- public final void notifyLeftRepeating() {
- leftRepeating = true;
- outputBatch.resetBatchBuilder();
- }
-
- public final void notifyLeftStoppedRepeating() {
- leftRepeating = false;
- svRightPosition = 0;
+ ++outputPosition;
}
- public final boolean isLeftRepeating() {
- return leftRepeating;
+ public void disableMarking() {
+ allowMarking = false;
}
- public void setDefaultAdvanceMode() {
- rightSourceMode = RightSourceMode.INCOMING;
+ public void enableMarking() {
+ allowMarking = true;
}
- public void setSV4AdvanceMode() {
- rightSourceMode = RightSourceMode.SV4;
- svRightPosition = 0;
+ public boolean shouldMark() {
+ return allowMarking;
}
/**
- * Check if the left record position can advance by one.
- * Side effect: advances to next left batch if current left batch size is exceeded.
+ * Return state of join based on status of left and right iterator.
+ * @return
+ * 1. JoinOutcome.NO_MORE_DATA : Join is finished
+ * 2. JoinOutcome.FAILURE : There is an error during join.
+ * 3. JoinOutcome.BATCH_RETURNED : one of the side has data
+ * 4. JoinOutcome.SCHEMA_CHANGED : one of the side has change in schema.
*/
- public final boolean isLeftPositionAllowed() {
- if (lastLeft == IterOutcome.NONE) {
- return false;
- }
- if (!isLeftPositionInCurrentBatch()) {
- leftPosition = 0;
- releaseData(left);
- lastLeft = nextLeft();
- return lastLeft == IterOutcome.OK;
- }
- lastLeft = IterOutcome.OK;
- return true;
- }
-
- /**
- * Check if the right record position can advance by one.
- * Side effect: advances to next right batch if current right batch size is exceeded
- */
- public final boolean isRightPositionAllowed() {
- if (rightSourceMode == RightSourceMode.SV4) {
- return svRightPosition < sv4.getCount();
- }
- if (lastRight == IterOutcome.NONE) {
- return false;
- }
- if (!isRightPositionInCurrentBatch()) {
- rightPosition = 0;
- releaseData(right);
- lastRight = nextRight();
- return lastRight == IterOutcome.OK;
- }
- lastRight = IterOutcome.OK;
- return true;
- }
-
- private void releaseData(RecordBatch b) {
- for (VectorWrapper<?> v : b) {
- v.clear();
- }
- if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
- b.getSelectionVector2().clear();
- }
- }
-
- /**
- * Check if the left record position can advance by one in the current batch.
- */
- public final boolean isLeftPositionInCurrentBatch() {
- return leftPosition < left.getRecordCount();
- }
-
- /**
- * Check if the right record position can advance by one in the current batch.
- */
- public final boolean isRightPositionInCurrentBatch() {
- return rightPosition < right.getRecordCount();
- }
-
- /**
- * Check if the next left record position can advance by one in the current batch.
- */
- public final boolean isNextLeftPositionInCurrentBatch() {
- return leftPosition + 1 < left.getRecordCount();
- }
-
- public IterOutcome getLastRight() {
- return lastRight;
- }
-
- public IterOutcome getLastLeft() {
- return lastLeft;
- }
-
- /**
- * Check if the next left record position can advance by one in the current batch.
- */
- public final boolean isNextRightPositionInCurrentBatch() {
- return rightPosition + 1 < right.getRecordCount();
- }
-
public JoinOutcome getOutcome() {
if (!ok) {
return JoinOutcome.FAILURE;
}
if (bothMatches(IterOutcome.NONE) ||
- (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) ||
- (joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) ||
- (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) {
+ (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) ||
+ (joinType == JoinRelType.LEFT && getLeftStatus() == IterOutcome.NONE) ||
+ (joinType == JoinRelType.RIGHT && getRightStatus() == IterOutcome.NONE)) {
return JoinOutcome.NO_MORE_DATA;
}
if (bothMatches(IterOutcome.OK) ||
- (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) {
+ (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) {
return JoinOutcome.BATCH_RETURNED;
}
if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) {
return JoinOutcome.SCHEMA_CHANGED;
}
+ // should never see NOT_YET
if (eitherMatches(IterOutcome.NOT_YET)) {
return JoinOutcome.WAITING;
}
+ ok = false;
+ // on STOP, OUT_OF_MEMORY return FAILURE.
return JoinOutcome.FAILURE;
}
private boolean bothMatches(IterOutcome outcome) {
- return lastLeft == outcome && lastRight == outcome;
+ return getLeftStatus() == outcome && getRightStatus() == outcome;
}
private boolean eitherMatches(IterOutcome outcome) {
- return lastLeft == outcome || lastRight == outcome;
+ return getLeftStatus() == outcome || getRightStatus() == outcome;
}
-}
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index bb43e83dc..ed900dbae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -26,52 +26,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.calcite.rel.core.JoinRelType;
/**
- * This join template uses a merge join to combine two ordered streams into a single larger batch. When joining
- * single values on each side, the values can be copied to the outgoing batch immediately. The outgoing record batch
- * should be sent as needed (e.g. schema change or outgoing batch full). When joining multiple values on one or
- * both sides, two passes over the vectors will be made; one to construct the selection vector, and another to
- * generate the outgoing batches once the duplicate value is no longer encountered.
- *
- * Given two tables ordered by 'col1':
- *
- * t1 t2
- * --------------- ---------------
- * | key | col2 | | key | col2 |
- * --------------- ---------------
- * | 1 | 'ab' | | 1 | 'AB' |
- * | 2 | 'cd' | | 2 | 'CD' |
- * | 2 | 'ef' | | 4 | 'EF' |
- * | 4 | 'gh' | | 4 | 'GH' |
- * | 4 | 'ij' | | 5 | 'IJ' |
- * --------------- ---------------
- *
- * 'SELECT * FROM t1 INNER JOIN t2 on (t1.key == t2.key)' should generate the following:
- *
- * ---------------------------------
- * | t1.key | t2.key | col1 | col2 |
- * ---------------------------------
- * | 1 | 1 | 'ab' | 'AB' |
- * | 2 | 2 | 'cd' | 'CD' |
- * | 2 | 2 | 'ef' | 'CD' |
- * | 4 | 4 | 'gh' | 'EF' |
- * | 4 | 4 | 'gh' | 'GH' |
- * | 4 | 4 | 'ij' | 'EF' |
- * | 4 | 4 | 'ij' | 'GH' |
- * ---------------------------------
- *
- * In the simple match case, only one row from each table matches. Additional cases should be considered:
- * - a left join key matches multiple right join keys
- * - duplicate keys which may span multiple record batches (on the left and/or right side)
- * - one or both incoming record batches change schemas
- *
- * In the case where a left join key matches multiple right join keys:
- * - add a reference to all of the right table's matching values to the SV4.
- *
- * A RecordBatchData object should be used to hold onto all batches which have not been sent.
- *
- * JoinStatus:
- * - all state related to the join operation is stored in the JoinStatus object.
- * - this is required since code may be regenerated before completion of an outgoing record batch.
+ * Merge Join implementation using RecordIterator.
*/
public abstract class JoinTemplate implements JoinWorker {
@@ -86,138 +41,88 @@ public abstract class JoinTemplate implements JoinWorker {
* @return true of join succeeded; false if the worker needs to be regenerated
*/
public final boolean doJoin(final JoinStatus status) {
- while(!status.isOutgoingBatchFull()) {
- // for each record
+ final boolean isLeftJoin = (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT);
- // validate input iterators (advancing to the next record batch if necessary)
- if (!status.isRightPositionAllowed()) {
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- // we've hit the end of the right record batch; copy any remaining values from the left batch
- while (status.isLeftPositionAllowed()) {
+ while (!status.isOutgoingBatchFull()) {
+ if (status.right.finished()) {
+ if (isLeftJoin) {
+ while (!status.left.finished()) {
if (status.isOutgoingBatchFull()) {
return true;
}
- doCopyLeft(status.getLeftPosition(), status.getOutPosition());
-
+ doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition());
status.incOutputPos();
- status.advanceLeft();
+ status.left.next();
}
}
return true;
}
- if (!status.isLeftPositionAllowed()) {
+ if (status.left.finished()) {
return true;
}
-
- int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
+ final int comparison = doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition());
switch (comparison) {
-
- case -1:
- // left key < right key
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- doCopyLeft(status.getLeftPosition(), status.getOutPosition());
- status.incOutputPos();
- }
- status.advanceLeft();
- continue;
-
- case 0:
- // left key == right key
-
- // check for repeating values on the left side
- if (!status.isLeftRepeating() &&
- status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) == 0) {
- // subsequent record(s) in the left batch have the same key
- status.notifyLeftRepeating();
- } else if (status.isLeftRepeating() &&
- status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) != 0) {
- // this record marks the end of repeated keys
- status.notifyLeftStoppedRepeating();
- }
-
- boolean crossedBatchBoundaries;
- int initialRightPosition;
- if (status.hasIntermediateData()) {
- crossedBatchBoundaries = status.getCrossedBatchBoundaries();
- initialRightPosition = status.getInitialRightPosition();
- status.resetIntermediateData();
- } else {
- crossedBatchBoundaries = false;
- initialRightPosition = status.getRightPosition();
- }
-
- do {
- if (status.isOutgoingBatchFull()) {
- status.setIntermediateData(initialRightPosition, crossedBatchBoundaries);
- return true;
+ case -1:
+ // left key < right key
+ if (isLeftJoin) {
+ doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition());
+ status.incOutputPos();
}
- // copy all equal right keys to the output record batch
- doCopyLeft(status.getLeftPosition(), status.getOutPosition());
-
- doCopyRight(status.getRightPosition(), status.getOutPosition());
-
- status.incOutputPos();
-
- // If the left key has duplicates and we're about to cross a boundary in the right batch, add the
- // right table's record batch to the sv4 builder before calling next. These records will need to be
- // copied again for each duplicate left key.
- if (status.isLeftRepeating() && !status.isRightPositionInCurrentBatch()) {
- status.outputBatch.addRightToBatchBuilder();
- crossedBatchBoundaries = true;
+ status.left.next();
+ continue;
+
+ case 0:
+ // left key == right key
+ // Mark current position in right iterator.
+ // If we have set a mark in previous iteration but didn't finish the inner loop,
+ // skip current right side as its already copied in earlier iteration.
+ if (status.shouldMark()) {
+ status.right.mark();
+ // Copy all equal keys from right side to the output record batch.
+ doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition());
+ doCopyRight(status.right.getCurrentPosition(), status.getOutPosition());
+ status.incOutputPos();
}
- status.advanceRight();
-
- } while ((!status.isLeftRepeating() || status.isRightPositionInCurrentBatch())
- && status.isRightPositionAllowed()
- && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0);
-
- if (status.getRightPosition() > initialRightPosition &&
- (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) {
- // more than one matching result from right table; reset position in case of subsequent left match
- status.setRightPosition(initialRightPosition);
- }
- status.advanceLeft();
-
- if (status.isLeftRepeating() && status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) != 0) {
- // left no longer has duplicates. switch back to incoming batch mode
- status.setDefaultAdvanceMode();
- status.notifyLeftStoppedRepeating();
- } else if (status.isLeftRepeating() && crossedBatchBoundaries) {
- try {
- // build the right batches and
- status.outputBatch.batchBuilder.build();
- status.setSV4AdvanceMode();
- } catch (SchemaChangeException e) {
- status.ok = false;
+ // Move to next position in right iterator.
+ status.right.next();
+ while (!status.right.finished()) {
+ if (doCompare(status.left.getCurrentPosition(), status.right.getCurrentPosition()) == 0) {
+ doCopyLeft(status.left.getCurrentPosition(), status.getOutPosition());
+ doCopyRight(status.right.getCurrentPosition(), status.getOutPosition());
+ status.incOutputPos();
+ if (status.isOutgoingBatchFull()) {
+ // Leave iterators at their current positions and markers.
+ // Don't mark on all subsequent doJoin iterations.
+ status.disableMarking();
+ return true;
+ }
+ status.right.next();
+ } else {
+ break;
+ }
}
- // return to indicate recompile in right-sv4 mode
- return true;
- }
-
- continue;
-
- case 1:
- // left key > right key
- status.advanceRight();
- continue;
-
- default:
- throw new IllegalStateException();
+ status.right.reset();
+ // Enable marking only when we have consumed all equal keys on right side.
+ status.enableMarking();
+ status.left.next();
+ continue;
+ case 1:
+ // left key > right key
+ status.right.next();
+ continue;
+
+ default:
+ throw new IllegalStateException();
}
}
-
return true;
}
// Generated Methods
public abstract void doSetup(@Named("context") FragmentContext context,
- @Named("status") JoinStatus status,
- @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException;
-
+ @Named("status") JoinStatus status,
+ @Named("outgoing") VectorContainer outgoing) throws SchemaChangeException;
/**
* Copy the data to the new record batch (if it fits).
@@ -241,15 +146,5 @@ public abstract class JoinTemplate implements JoinWorker {
* 1 if left is > right
*/
protected abstract int doCompare(@Named("leftIndex") int leftIndex,
- @Named("rightIndex") int rightIndex);
-
-
- /**
- * Compare the current left key to the next left key, if it's within the batch.
- * @return 0 if both keys are equal,
- * 1 if the keys are not equal, and
- * -1 if there are no more keys in this batch
- */
- protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);
-
-}
+ @Named("rightIndex") int rightIndex);
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index d39df8f17..2476a838a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.resolver.TypeCastRules;
import java.util.LinkedList;
@@ -162,8 +163,8 @@ public class JoinUtils {
* @param rightBatch right input record batch
* @param context fragment context
*/
- public static void addLeastRestrictiveCasts(LogicalExpression[] leftExpressions, RecordBatch leftBatch,
- LogicalExpression[] rightExpressions, RecordBatch rightBatch,
+ public static void addLeastRestrictiveCasts(LogicalExpression[] leftExpressions, VectorAccessible leftBatch,
+ LogicalExpression[] rightExpressions, VectorAccessible rightBatch,
FragmentContext context) {
assert rightExpressions.length == leftExpressions.length;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index e4e13d1ec..55322f849 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -32,6 +32,7 @@ public interface JoinWorker {
public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
public boolean doJoin(JoinStatus status);
- public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
+ public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(
+ JoinWorker.class, JoinTemplate.class);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 2e777f66b..96113e966 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
import java.io.IOException;
import java.util.List;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -43,74 +44,62 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator;
-import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordIterator;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.calcite.rel.core.JoinRelType;
import com.google.common.base.Preconditions;
import com.sun.codemodel.JClass;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JMod;
-import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
/**
- * A merge join combining to incoming in-order batches.
+ * A join operator merges two sorted streams using record iterator.
*/
public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
- public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
- public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
public final MappingSet setupMapping =
- new MappingSet("null", "null",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doSetup", null, null));
+ new MappingSet("null", "null",
+ GM("doSetup", "doSetup", null, null),
+ GM("doSetup", "doSetup", null, null));
public final MappingSet copyLeftMapping =
- new MappingSet("leftIndex", "outIndex",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doCopyLeft", null, null));
+ new MappingSet("leftIndex", "outIndex",
+ GM("doSetup", "doSetup", null, null),
+ GM("doSetup", "doCopyLeft", null, null));
public final MappingSet copyRightMappping =
- new MappingSet("rightIndex", "outIndex",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doCopyRight", null, null));
+ new MappingSet("rightIndex", "outIndex",
+ GM("doSetup", "doSetup", null, null),
+ GM("doSetup", "doCopyRight", null, null));
public final MappingSet compareMapping =
- new MappingSet("leftIndex", "rightIndex",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doCompare", null, null));
+ new MappingSet("leftIndex", "rightIndex",
+ GM("doSetup", "doSetup", null, null),
+ GM("doSetup", "doCompare", null, null));
public final MappingSet compareRightMapping =
- new MappingSet("rightIndex", "null",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doCompare", null, null));
- public final MappingSet compareLeftMapping =
- new MappingSet("leftIndex", "null",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doCompareNextLeftKey", null, null));
- public final MappingSet compareNextLeftMapping =
- new MappingSet("nextLeftIndex", "null",
- GM("doSetup", "doSetup", null, null),
- GM("doSetup", "doCompareNextLeftKey", null, null));
-
+ new MappingSet("rightIndex", "null",
+ GM("doSetup", "doSetup", null, null),
+ GM("doSetup", "doCompare", null, null));
private final RecordBatch left;
private final RecordBatch right;
+ private final RecordIterator leftIterator;
+ private final RecordIterator rightIterator;
private final JoinStatus status;
private final List<JoinCondition> conditions;
private final JoinRelType joinType;
private JoinWorker worker;
- public MergeJoinBatchBuilder batchBuilder;
private boolean areNullsEqual = false; // whether nulls compare equal
private static final String LEFT_INPUT = "LEFT INPUT";
@@ -123,10 +112,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
}
this.left = left;
+ this.leftIterator = new RecordIterator(left, this, oContext, 0);
this.right = right;
+ this.rightIterator = new RecordIterator(right, this, oContext, 1);
this.joinType = popConfig.getJoinType();
- this.status = new JoinStatus(left, right, this);
- this.batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
+ this.status = new JoinStatus(leftIterator, rightIterator, this);
this.conditions = popConfig.getConditions();
JoinComparator comparator = JoinComparator.NONE;
@@ -147,11 +137,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
@Override
- public void buildSchema() throws SchemaChangeException {
- status.ensureInitial();
+ public void buildSchema() {
+ // initialize iterators
+ status.initialize();
- final IterOutcome leftOutcome = status.getLastLeft();
- final IterOutcome rightOutcome = status.getLastRight();
+ final IterOutcome leftOutcome = status.getLeftStatus();
+ final IterOutcome rightOutcome = status.getRightStatus();
if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) {
state = BatchState.STOP;
return;
@@ -161,36 +152,38 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
state = BatchState.OUT_OF_MEMORY;
return;
}
-
allocateBatch(true);
}
@Override
public IterOutcome innerNext() {
// we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
- status.ensureInitial();
-
+ status.prepare();
// loop so we can start over again if we find a new batch was created.
while (true) {
-
- JoinOutcome outcome = status.getOutcome();
- // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
- if (outcome == JoinOutcome.SCHEMA_CHANGED) {
- allocateBatch(true);
- } else if (outcome == JoinOutcome.BATCH_RETURNED) {
- allocateBatch(false);
- }
-
- // reset the output position to zero after our parent iterates this RecordBatch
- if (outcome == JoinOutcome.BATCH_RETURNED ||
- outcome == JoinOutcome.SCHEMA_CHANGED ||
- outcome == JoinOutcome.NO_MORE_DATA) {
- status.resetOutputPos();
- }
-
- if (outcome == JoinOutcome.NO_MORE_DATA) {
- logger.debug("NO MORE DATA; returning {} NONE");
- return IterOutcome.NONE;
+ // Check result of last iteration.
+ switch (status.getOutcome()) {
+ case BATCH_RETURNED:
+ allocateBatch(false);
+ status.resetOutputPos();
+ break;
+ case SCHEMA_CHANGED:
+ allocateBatch(true);
+ status.resetOutputPos();
+ break;
+ case NO_MORE_DATA:
+ status.resetOutputPos();
+ logger.debug("NO MORE DATA; returning {} NONE");
+ return IterOutcome.NONE;
+ case FAILURE:
+ status.left.clearInflightBatches();
+ status.right.clearInflightBatches();
+ kill(false);
+ return IterOutcome.STOP;
+ case WAITING:
+ return IterOutcome.NOT_YET;
+ default:
+ throw new IllegalStateException();
}
boolean first = false;
@@ -214,36 +207,39 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
worker = null;
}
- // get the outcome of the join.
+ // get the outcome of the last join iteration.
switch (status.getOutcome()) {
- case BATCH_RETURNED:
- // only return new schema if new worker has been setup.
- logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
- setRecordCountInContainer();
- return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
- case FAILURE:
- kill(false);
- return IterOutcome.STOP;
- case NO_MORE_DATA:
- logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE")));
- setRecordCountInContainer();
- state = BatchState.DONE;
- return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE);
- case SCHEMA_CHANGED:
- worker = null;
- if (status.getOutPosition() > 0) {
- // if we have current data, let's return that.
- logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
+ case BATCH_RETURNED:
+ // only return new schema if new worker has been setup.
+ logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
setRecordCountInContainer();
return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
- }else{
- // loop again to rebuild worker.
- continue;
- }
- case WAITING:
- return IterOutcome.NOT_YET;
- default:
- throw new IllegalStateException();
+ case FAILURE:
+ status.left.clearInflightBatches();
+ status.right.clearInflightBatches();
+ kill(false);
+ return IterOutcome.STOP;
+ case NO_MORE_DATA:
+ logger.debug("NO MORE DATA; returning {}",
+ (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" : "NONE")));
+ setRecordCountInContainer();
+ state = BatchState.DONE;
+ return (first? IterOutcome.OK_NEW_SCHEMA : (status.getOutPosition() > 0 ? IterOutcome.OK: IterOutcome.NONE));
+ case SCHEMA_CHANGED:
+ worker = null;
+ if (status.getOutPosition() > 0) {
+ // if we have current data, let's return that.
+ logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
+ setRecordCountInContainer();
+ return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
+ } else{
+ // loop again to rebuild worker.
+ continue;
+ }
+ case WAITING:
+ return IterOutcome.NOT_YET;
+ default:
+ throw new IllegalStateException();
}
}
}
@@ -255,13 +251,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
}
- public void resetBatchBuilder() {
- batchBuilder.close();
- batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
- }
-
- public void addRightToBatchBuilder() {
- batchBuilder.add(right);
+ @Override
+ public void close() {
+ super.close();
+ leftIterator.close();
+ rightIterator.close();
}
@Override
@@ -270,69 +264,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
right.kill(sendUpstream);
}
- private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
- LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, JVar joinStatus,
- ErrorCollector collector) throws ClassTransformationException {
- boolean nextLeftIndexDeclared = false;
-
- cg.setMappingSet(compareLeftMapping);
-
- for (int i = 0; i < leftExpression.length; i++) {
-
- // materialize value vector readers from join expression
- final LogicalExpression materializedLeftExpr = leftExpression[i];
-
- // generate compareNextLeftKey()
- ////////////////////////////////
- cg.setMappingSet(compareLeftMapping);
- cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch));
-
- if (!nextLeftIndexDeclared) {
- // int nextLeftIndex = leftIndex + 1;
- cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "nextLeftIndex", JExpr.direct("leftIndex").plus(JExpr.lit(1)));
- nextLeftIndexDeclared = true;
- }
- // check if the next key is in this batch
- cg.getEvalBlock()._if(joinStatus.invoke("isNextLeftPositionInCurrentBatch").eq(JExpr.lit(false)))
- ._then()
- ._return(JExpr.lit(-1));
-
- // generate VV read expressions
- ClassGenerator.HoldingContainer compareThisLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
- cg.setMappingSet(compareNextLeftMapping); // change mapping from 'leftIndex' to 'nextLeftIndex'
- ClassGenerator.HoldingContainer compareNextLeftExprHolder = cg.addExpr(materializedLeftExpr, false);
-
- if (compareThisLeftExprHolder.isOptional()) {
- // handle null == null
- cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
- .cand(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
- ._then()
- ._return(JExpr.lit(0));
-
- // handle null == !null
- cg.getEvalBlock()._if(compareThisLeftExprHolder.getIsSet().eq(JExpr.lit(0))
- .cor(compareNextLeftExprHolder.getIsSet().eq(JExpr.lit(0))))
- ._then()
- ._return(JExpr.lit(1));
- }
-
- // check value equality
-
- LogicalExpression gh =
- FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareThisLeftExprHolder,
- compareNextLeftExprHolder,
- context.getFunctionRegistry());
- HoldingContainer out = cg.addExpr(gh, false);
-
- // If not 0, it means not equal. We return this out value.
- JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- jc._then()._return(out.getValue());
- }
-
- //Pass the equality check for all the join conditions. Finally, return 0.
- cg.getEvalBlock()._return(JExpr.lit(0));
- }
-
private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -353,7 +284,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
cg.getSetupBlock().assign(JExpr._this().ref(outgoingVectorContainer), JExpr.direct("outgoing"));
// declare and assign incoming left RecordBatch member
- JClass recordBatchClass = cg.getModel().ref(RecordBatch.class);
+ JClass recordBatchClass = cg.getModel().ref(RecordIterator.class);
JVar incomingLeftRecordBatch = cg.clazz.field(JMod.NONE, recordBatchClass, "incomingLeft");
cg.getSetupBlock().assign(JExpr._this().ref(incomingLeftRecordBatch), joinStatus.ref("left"));
@@ -371,34 +302,30 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
*/
LogicalExpression leftExpr[] = new LogicalExpression[conditions.size()];
LogicalExpression rightExpr[] = new LogicalExpression[conditions.size()];
- IterOutcome lastLeftStatus = status.getLastLeft();
- IterOutcome lastRightStatus = status.getLastRight();
+ IterOutcome lastLeftStatus = status.getLeftStatus();
+ IterOutcome lastRightStatus = status.getRightStatus();
for (int i = 0; i < conditions.size(); i++) {
JoinCondition condition = conditions.get(i);
- leftExpr[i] = materializeExpression(condition.getLeft(), lastLeftStatus, left, collector);
- rightExpr[i] = materializeExpression(condition.getRight(), lastRightStatus, right, collector);
+ leftExpr[i] = materializeExpression(condition.getLeft(), lastLeftStatus, leftIterator, collector);
+ rightExpr[i] = materializeExpression(condition.getRight(), lastRightStatus, rightIterator, collector);
}
// if right side is empty, rightExpr will most likely default to NULLABLE INT which may cause the following
// call to throw an exception. In this case we can safely skip adding the casts
if (lastRightStatus != IterOutcome.NONE) {
- JoinUtils.addLeastRestrictiveCasts(leftExpr, left, rightExpr, right, context);
+ JoinUtils.addLeastRestrictiveCasts(leftExpr, leftIterator, rightExpr, rightIterator, context);
}
//generate doCompare() method
/////////////////////////////////////////
generateDoCompare(cg, incomingRecordBatch, leftExpr, incomingLeftRecordBatch, rightExpr,
- incomingRightRecordBatch, collector);
-
- //generate doCompareNextLeftKey() method
- /////////////////////////////////////////
- generateDoCompareNextLeft(cg, incomingRecordBatch, leftExpr, incomingLeftRecordBatch, joinStatus, collector);
+ incomingRightRecordBatch, collector);
// generate copyLeft()
//////////////////////
cg.setMappingSet(copyLeftMapping);
int vectorId = 0;
- if (worker == null || status.isLeftPositionAllowed()) {
- for (VectorWrapper<?> vw : left) {
+ if (worker == null || !status.left.finished()) {
+ for (VectorWrapper<?> vw : leftIterator) {
MajorType inputType = vw.getField().getType();
MajorType outputType;
if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
@@ -406,15 +333,16 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
outputType = inputType;
}
+ // TODO (DRILL-4011): Factor out CopyUtil and use it here.
JVar vvIn = cg.declareVectorValueSetupAndMember("incomingLeft",
- new TypedFieldId(inputType, vectorId));
+ new TypedFieldId(inputType, vectorId));
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
- new TypedFieldId(outputType,vectorId));
+ new TypedFieldId(outputType,vectorId));
// todo: check result of copyFromSafe and grow allocation
cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
- .arg(copyLeftMapping.getValueReadIndex())
- .arg(copyLeftMapping.getValueWriteIndex())
- .arg(vvIn));
+ .arg(copyLeftMapping.getValueReadIndex())
+ .arg(copyLeftMapping.getValueWriteIndex())
+ .arg(vvIn));
++vectorId;
}
}
@@ -424,8 +352,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
cg.setMappingSet(copyRightMappping);
int rightVectorBase = vectorId;
- if (status.getLastRight() != IterOutcome.NONE && (worker == null || status.isRightPositionAllowed())) {
- for (VectorWrapper<?> vw : right) {
+ if (status.getRightStatus() != IterOutcome.NONE && (worker == null || !status.right.finished())) {
+ for (VectorWrapper<?> vw : rightIterator) {
MajorType inputType = vw.getField().getType();
MajorType outputType;
if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
@@ -433,15 +361,16 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
outputType = inputType;
}
+ // TODO (DRILL-4011): Factor out CopyUtil and use it here.
JVar vvIn = cg.declareVectorValueSetupAndMember("incomingRight",
- new TypedFieldId(inputType, vectorId - rightVectorBase));
+ new TypedFieldId(inputType, vectorId - rightVectorBase));
JVar vvOut = cg.declareVectorValueSetupAndMember("outgoing",
- new TypedFieldId(outputType,vectorId));
+ new TypedFieldId(outputType,vectorId));
// todo: check result of copyFromSafe and grow allocation
cg.getEvalBlock().add(vvOut.invoke("copyFromSafe")
- .arg(copyRightMappping.getValueReadIndex())
- .arg(copyRightMappping.getValueWriteIndex())
- .arg(vvIn));
+ .arg(copyRightMappping.getValueReadIndex())
+ .arg(copyRightMappping.getValueWriteIndex())
+ .arg(vvIn));
++vectorId;
}
}
@@ -455,13 +384,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// allocate new batch space.
container.zeroVectors();
- boolean leftAllowed = status.getLastLeft() != IterOutcome.NONE;
- boolean rightAllowed = status.getLastRight() != IterOutcome.NONE;
+ boolean leftAllowed = status.getLeftStatus() != IterOutcome.NONE;
+ boolean rightAllowed = status.getRightStatus() != IterOutcome.NONE;
if (newSchema) {
- // add fields from both batches
+ // add fields from both batches
if (leftAllowed) {
- for (VectorWrapper<?> w : left) {
+ for (VectorWrapper<?> w : leftIterator) {
MajorType inputType = w.getField().getType();
MajorType outputType;
if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
@@ -477,9 +406,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
}
}
-
if (rightAllowed) {
- for (VectorWrapper<?> w : right) {
+ for (VectorWrapper<?> w : rightIterator) {
MajorType inputType = w.getField().getType();
MajorType outputType;
if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
@@ -496,7 +424,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
}
}
-
for (VectorWrapper w : container) {
AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE);
}
@@ -506,16 +433,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
private void generateDoCompare(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
- LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression,
- JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
+ LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, LogicalExpression[] rightExpression,
+ JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException {
cg.setMappingSet(compareMapping);
- if (status.getLastRight() != IterOutcome.NONE) {
+ if (status.getRightStatus() != IterOutcome.NONE) {
assert leftExpression.length == rightExpression.length;
for (int i = 0; i < leftExpression.length; i++) {
-
-
// generate compare()
////////////////////////
cg.setMappingSet(compareMapping);
@@ -527,17 +452,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(rightExpression[i], false);
LogicalExpression fh =
- FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareLeftExprHolder,
- compareRightExprHolder,
- context.getFunctionRegistry());
+ FunctionGenerationHelper.getOrderingComparatorNullsHigh(compareLeftExprHolder,
+ compareRightExprHolder,
+ context.getFunctionRegistry());
HoldingContainer out = cg.addExpr(fh, false);
// If not 0, it means not equal.
// Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal.
if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()
- && ! areNullsEqual) {
+ && ! areNullsEqual) {
JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)).
- cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))));
+ cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))));
jc._then()._return(JExpr.lit(1));
jc._elseif(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue());
} else {
@@ -551,7 +476,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
private LogicalExpression materializeExpression(LogicalExpression expression, IterOutcome lastStatus,
- RecordBatch input, ErrorCollector collector) throws ClassTransformationException {
+ VectorAccessible input, ErrorCollector collector) throws ClassTransformationException {
LogicalExpression materializedExpr = null;
if (lastStatus != IterOutcome.NONE) {
materializedExpr = ExpressionTreeMaterializer.materialize(expression, input, collector, context.getFunctionRegistry());
@@ -560,8 +485,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
- "Failure while trying to materialize incoming field from %s batch. Errors:\n %s.",
- (input == left ? LEFT_INPUT : RIGHT_INPUT), collector.toErrorString()));
+ "Failure while trying to materialize incoming field from %s batch. Errors:\n %s.",
+ (input == leftIterator ? LEFT_INPUT : RIGHT_INPUT), collector.toErrorString()));
}
return materializedExpr;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
deleted file mode 100644
index 279801022..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.join;
-
-import io.netty.buffer.DrillBuf;
-
-import java.util.List;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
-
-import com.google.common.collect.ArrayListMultimap;
-
-public class MergeJoinBatchBuilder implements AutoCloseable {
-
- private final ArrayListMultimap<BatchSchema, RecordBatchData> queuedRightBatches = ArrayListMultimap.create();
- private VectorContainer container;
- private int runningBytes;
- private int runningBatches;
- private int recordCount;
- private PreAllocator svAllocator;
- private boolean svAllocatorUsed = false;
- private JoinStatus status;
-
- public MergeJoinBatchBuilder(BufferAllocator allocator, JoinStatus status) {
- this.container = new VectorContainer();
- this.status = status;
- this.svAllocator = allocator.getNewPreAllocator();
- }
-
- public boolean add(RecordBatch batch) {
- if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) {
- throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch.");
- }
- if (batch.getRecordCount() == 0) {
- return true; // skip over empty record batches.
- }
-
- // resource checks
- long batchBytes = getSize(batch);
- if (batchBytes + runningBytes > Integer.MAX_VALUE) {
- return false; // TODO: 2GB is arbitrary
- }
- if (runningBatches++ >= Character.MAX_VALUE) {
- return false; // allowed in batch.
- }
- if (!svAllocator.preAllocate(batch.getRecordCount()*4)) {
- return false; // sv allocation available.
- }
-
- // transfer VVs to a new RecordBatchData
- RecordBatchData bd = new RecordBatchData(batch);
- runningBytes += batchBytes;
- queuedRightBatches.put(batch.getSchema(), bd);
- recordCount += bd.getRecordCount();
- return true;
- }
-
- private long getSize(RecordBatch batch) {
- long bytes = 0;
- for (VectorWrapper<?> v : batch) {
- bytes += v.getValueVector().getBufferSize();
- }
- return bytes;
- }
-
- public void build() throws SchemaChangeException {
- container.clear();
- if (queuedRightBatches.size() > Character.MAX_VALUE) {
- throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
- }
- final DrillBuf drillBuf = svAllocator.getAllocation();
- svAllocatorUsed = true;
- status.sv4 = new SelectionVector4(drillBuf, recordCount, Character.MAX_VALUE);
- BatchSchema schema = queuedRightBatches.keySet().iterator().next();
- List<RecordBatchData> data = queuedRightBatches.get(schema);
-
- // now we're going to generate the sv4 pointers
- switch (schema.getSelectionVectorMode()) {
- case NONE: {
- int index = 0;
- int recordBatchId = 0;
- for (RecordBatchData d : data) {
- for (int i =0; i < d.getRecordCount(); i++, index++) {
- status.sv4.set(index, recordBatchId, i);
- }
- recordBatchId++;
- }
- break;
- }
- case TWO_BYTE: {
- int index = 0;
- int recordBatchId = 0;
- for (RecordBatchData d : data) {
- for (int i =0; i < d.getRecordCount(); i++, index++) {
- status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
- }
- // might as well drop the selection vector since we'll stop using it now.
- d.getSv2().clear();
- recordBatchId++;
- }
- break;
- }
- default:
- throw new UnsupportedOperationException();
- }
-
- // next, we'll create lists of each of the vector types.
- ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
- for (RecordBatchData rbd : queuedRightBatches.values()) {
- for (ValueVector v : rbd.getVectors()) {
- vectors.put(v.getField(), v);
- }
- }
-
- for (MaterializedField f : vectors.keySet()) {
- List<ValueVector> v = vectors.get(f);
- container.addHyperList(v);
- }
-
- container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
- }
-
- @Override
- public void close() {
- if (!svAllocatorUsed) {
- final DrillBuf drillBuf = svAllocator.getAllocation();
- if (drillBuf != null) {
- drillBuf.release();
- }
- }
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index f68bed156..6bfb483e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -159,4 +159,19 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
vectors = (T[]) ArrayUtils.add(vectors, vv);
}
+ /**
+ * Transfer vectors to destination HyperVectorWrapper.
+ * Both this and destination must be of same type and have same number of vectors.
+ * @param destination destination HyperVectorWrapper.
+ */
+ public void transfer(VectorWrapper<?> destination) {
+ Preconditions.checkArgument(destination instanceof HyperVectorWrapper);
+ Preconditions.checkArgument(getField().getType().equals(destination.getField().getType()));
+ Preconditions.checkArgument(vectors.length == ((HyperVectorWrapper)destination).vectors.length);
+
+ ValueVector[] destionationVectors = ((HyperVectorWrapper)destination).vectors;
+ for (int i = 0; i < vectors.length; ++i) {
+ vectors[i].makeTransferPair(destionationVectors[i]).transfer();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
new file mode 100644
index 000000000..faa4d830d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeRangeMap;
+
+/**
+ * RecordIterator iterates over incoming record batches one record at a time.
+ * It allows to mark a position during iteration and reset back.
+ * RecordIterator will hold onto multiple record batches in order to support resetting beyond record batch boundary.
+ */
+public class RecordIterator implements VectorAccessible {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordIterator.class);
+
+ private final RecordBatch incoming;
+ private final AbstractRecordBatch<?> outgoing;
+ private long outerPosition; // Tracks total records consumed so far, works across batches.
+ private int innerPosition; // Index within current vector container.
+ private int innerRecordCount; // Records in current container.
+ private long totalRecordCount; // Total records read so far.
+ private long startBatchPosition; // Start offset of current batch.
+ private int markedInnerPosition;
+ private long markedOuterPosition;
+ private IterOutcome lastOutcome;
+ private int inputIndex; // For two way merge join 0:left, 1:right
+ private boolean lastBatchRead; // True if all batches are consumed.
+ private boolean initialized;
+
+ private final VectorContainer container; // Holds VectorContainer of current record batch
+ private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create();
+
+ public RecordIterator(RecordBatch incoming,
+ AbstractRecordBatch<?> outgoing,
+ OperatorContext oContext,
+ int inputIndex) {
+ this.incoming = incoming;
+ this.outgoing = outgoing;
+ this.inputIndex = inputIndex;
+ this.lastBatchRead = false;
+ this.container = new VectorContainer(oContext);
+ resetIndices();
+ this.initialized = false;
+ }
+
+ private void resetIndices() {
+ this.innerPosition = -1;
+ this.startBatchPosition = -1;
+ this.outerPosition = -1;
+ this.totalRecordCount = 0;
+ this.innerRecordCount = 0;
+ this.markedInnerPosition = -1;
+ this.markedOuterPosition = -1;
+ }
+
+ // Get next record batch.
+ private void nextBatch() {
+ // We have already seen last batch.
+ if (lastBatchRead) {
+ return;
+ }
+ lastOutcome = outgoing.next(inputIndex, incoming);
+ }
+
+ public void mark() {
+ // Release all batches before current batch. [0 to startBatchPosition).
+ final Map<Range<Long>,RecordBatchData> oldBatches = batches.subRangeMap(Range.closedOpen(0l, startBatchPosition)).asMapOfRanges();
+ for (Range<Long> range : oldBatches.keySet()) {
+ oldBatches.get(range.lowerEndpoint()).clear();
+ }
+ batches.remove(Range.closedOpen(0l, startBatchPosition));
+ markedInnerPosition = innerPosition;
+ markedOuterPosition = outerPosition;
+ }
+
+ public void reset() {
+ if (markedOuterPosition >= 0) {
+ // Move to rbd for markedOuterPosition.
+ final RecordBatchData rbdNew = batches.get(markedOuterPosition);
+ final RecordBatchData rbdOld = batches.get(startBatchPosition);
+ Preconditions.checkArgument(rbdOld != null);
+ Preconditions.checkArgument(rbdNew != null);
+ if (rbdNew != rbdOld) {
+ container.transferOut(rbdOld.getContainer());
+ container.transferIn(rbdNew.getContainer());
+ }
+ innerPosition = markedInnerPosition;
+ outerPosition = markedOuterPosition;
+ startBatchPosition = batches.getEntry(outerPosition).getKey().lowerEndpoint();
+ innerRecordCount = (int)(batches.getEntry(outerPosition).getKey().upperEndpoint() - startBatchPosition);
+ markedInnerPosition = -1;
+ markedOuterPosition = -1;
+ }
+ }
+
+ // Move forward by delta (may cross one or more record batches)
+ public void forward(long delta) {
+ Preconditions.checkArgument(delta >= 0);
+ Preconditions.checkArgument(delta + outerPosition < totalRecordCount);
+ final long nextOuterPosition = delta + outerPosition;
+ final RecordBatchData rbdNew = batches.get(nextOuterPosition);
+ final RecordBatchData rbdOld = batches.get(outerPosition);
+ Preconditions.checkArgument(rbdNew != null);
+ Preconditions.checkArgument(rbdOld != null);
+ container.transferOut(rbdOld.getContainer());
+ // Get vectors from new position.
+ container.transferIn(rbdNew.getContainer());
+ outerPosition = nextOuterPosition;
+ startBatchPosition = batches.getEntry(outerPosition).getKey().lowerEndpoint();
+ innerPosition = (int)(outerPosition - startBatchPosition);
+ innerRecordCount = (int)(batches.getEntry(outerPosition).getKey().upperEndpoint() - startBatchPosition);
+ }
+
+ /**
+ * buildSchema calls next() in order to read schema quikcly.
+ * Make sure we have fetched next non-empty batch at the end of the prepare.
+ * After prepare position of iterator is at 0.
+ */
+ public void prepare() {
+ while (!lastBatchRead && outerPosition == -1) {
+ next();
+ }
+ }
+
+ /**
+ * Move iterator to next record.
+ * @return
+ * Status of current record batch read.
+ */
+ public IterOutcome next() {
+ if (finished()) {
+ return lastOutcome;
+ }
+ long nextOuterPosition = outerPosition + 1;
+ final int nextInnerPosition = innerPosition + 1;
+ if (!initialized || nextOuterPosition >= totalRecordCount) {
+ nextBatch();
+ switch (lastOutcome) {
+ case NONE:
+ case STOP:
+ // No more data, disallow reads unless reset is called.
+ outerPosition = nextOuterPosition;
+ lastBatchRead = true;
+ break;
+ case OK_NEW_SCHEMA:
+ case OK:
+ // If Schema changes in the middle of the execution clear out data.
+ if (initialized && lastOutcome == IterOutcome.OK_NEW_SCHEMA) {
+ clear();
+ resetIndices();
+ initialized = false;
+ nextOuterPosition = 0;
+ }
+ // Transfer vectors from incoming record batch.
+ final RecordBatchData rbd = new RecordBatchData(incoming);
+ innerRecordCount = incoming.getRecordCount();
+ if (!initialized) {
+ for (VectorWrapper<?> w : rbd.getContainer()) {
+ container.addOrGet(w.getField());
+ }
+ container.buildSchema(rbd.getContainer().getSchema().getSelectionVectorMode());
+ initialized = true;
+ }
+ if (innerRecordCount > 0) {
+ // Transfer vectors back to old batch.
+ if (startBatchPosition != -1 && batches.get(startBatchPosition) != null) {
+ container.transferOut(batches.get(outerPosition).getContainer());
+ }
+ container.transferIn(rbd.getContainer());
+ startBatchPosition = nextOuterPosition;
+ batches.put(Range.closedOpen(nextOuterPosition, nextOuterPosition + innerRecordCount), rbd);
+ innerPosition = 0;
+ outerPosition = nextOuterPosition;
+ totalRecordCount += innerRecordCount;
+ } else {
+ // Release schema/empty batches.
+ rbd.clear();
+ }
+ break;
+ case OUT_OF_MEMORY:
+ return lastOutcome;
+ case NOT_YET:
+ default:
+ throw new UnsupportedOperationException("Unsupported outcome received " + lastOutcome);
+ }
+ } else {
+ outerPosition = nextOuterPosition;
+ innerPosition = nextInnerPosition;
+ }
+ return lastOutcome;
+ }
+
+ public boolean finished() {
+ return lastBatchRead && outerPosition >= totalRecordCount;
+ }
+
+ public IterOutcome getLastOutcome() {
+ return lastOutcome;
+ }
+
+ public long getTotalRecordCount() {
+ return totalRecordCount;
+ }
+
+ public int getInnerRecordCount() {
+ return innerRecordCount;
+ }
+
+ public long getOuterPosition() {
+ return outerPosition;
+ }
+
+ public int getCurrentPosition() {
+ Preconditions.checkArgument(initialized);
+ Preconditions.checkArgument(innerPosition >= 0 && innerPosition < innerRecordCount);
+ return innerPosition;
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ Preconditions.checkArgument(initialized);
+ return container.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ Preconditions.checkArgument(initialized);
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ Preconditions.checkArgument(initialized);
+ return container.getSchema();
+ }
+
+ @Override
+ public int getRecordCount() {
+ Preconditions.checkArgument(initialized);
+ return innerRecordCount;
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ Preconditions.checkArgument(initialized);
+ return container.iterator();
+ }
+
+ // Release all vectors held by record batches, clear out range map.
+ public void clear() {
+ if (container != null) {
+ container.clear();
+ }
+ for (RecordBatchData d : batches.asMapOfRanges().values()) {
+ d.clear();
+ }
+ batches.clear();
+ }
+
+ // Deplete incoming batches.
+ public void clearInflightBatches() {
+ while (lastOutcome == IterOutcome.OK || lastOutcome == IterOutcome.OK_NEW_SCHEMA) {
+ // Clear all buffers from incoming.
+ for (VectorWrapper<?> wrapper : incoming) {
+ wrapper.getValueVector().clear();
+ }
+ lastOutcome = incoming.next();
+ }
+ }
+
+ public void close() {
+ clear();
+ clearInflightBatches();
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 85ec4667a..f767e74ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -33,32 +33,32 @@ import org.apache.drill.exec.vector.complex.ListVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.UnionVector;
-import java.util.ArrayList;
import java.util.List;
+import com.google.common.base.Preconditions;
public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
- private T v;
+ private T vector;
public SimpleVectorWrapper(T v) {
- this.v = v;
+ this.vector = v;
}
@SuppressWarnings("unchecked")
@Override
public Class<T> getVectorClass() {
- return (Class<T>) v.getClass();
+ return (Class<T>) vector.getClass();
}
@Override
public MaterializedField getField() {
- return v.getField();
+ return vector.getField();
}
@Override
public T getValueVector() {
- return v;
+ return vector;
}
@Override
@@ -74,14 +74,14 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
@SuppressWarnings("unchecked")
@Override
public VectorWrapper<T> cloneAndTransfer() {
- TransferPair tp = v.getTransferPair();
+ TransferPair tp = vector.getTransferPair();
tp.transfer();
return new SimpleVectorWrapper<T>((T) tp.getTo());
}
@Override
public void clear() {
- v.clear();
+ vector.clear();
}
public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v) {
@@ -95,7 +95,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
return this;
}
- ValueVector vector = v;
+ ValueVector vector = this.vector;
for (int i = 1; i < ids.length; i++) {
final AbstractMapVector mapLike = AbstractMapVector.class.cast(vector);
if (mapLike == null) {
@@ -109,15 +109,15 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
@Override
public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
- if (!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) {
+ if (!expectedPath.getRootSegment().segmentEquals(vector.getField().getPath().getRootSegment())) {
return null;
}
PathSegment seg = expectedPath.getRootSegment();
- if (v instanceof UnionVector) {
+ if (vector instanceof UnionVector) {
TypedFieldId.Builder builder = TypedFieldId.newBuilder();
builder.addId(id).remainder(expectedPath.getRootSegment().getChild());
- List<MinorType> minorTypes = ((UnionVector) v).getSubTypes();
+ List<MinorType> minorTypes = ((UnionVector) vector).getSubTypes();
MajorType.Builder majorTypeBuilder = MajorType.newBuilder().setMinorType(MinorType.UNION);
for (MinorType type : minorTypes) {
majorTypeBuilder.addSubType(type);
@@ -128,28 +128,28 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
builder.finalType(majorType);
return builder.build();
} else {
- return ((UnionVector) v).getFieldIdIfMatches(builder, false, seg.getChild());
+ return ((UnionVector) vector).getFieldIdIfMatches(builder, false, seg.getChild());
}
- } else if (v instanceof ListVector) {
- ListVector list = (ListVector) v;
+ } else if (vector instanceof ListVector) {
+ ListVector list = (ListVector) vector;
TypedFieldId.Builder builder = TypedFieldId.newBuilder();
- builder.intermediateType(v.getField().getType());
+ builder.intermediateType(vector.getField().getType());
builder.addId(id);
return list.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
} else
- if (v instanceof AbstractContainerVector) {
+ if (vector instanceof AbstractContainerVector) {
// we're looking for a multi path.
- AbstractContainerVector c = (AbstractContainerVector) v;
+ AbstractContainerVector c = (AbstractContainerVector) vector;
TypedFieldId.Builder builder = TypedFieldId.newBuilder();
- builder.intermediateType(v.getField().getType());
+ builder.intermediateType(vector.getField().getType());
builder.addId(id);
return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
} else {
TypedFieldId.Builder builder = TypedFieldId.newBuilder();
- builder.intermediateType(v.getField().getType());
+ builder.intermediateType(vector.getField().getType());
builder.addId(id);
- builder.finalType(v.getField().getType());
+ builder.finalType(vector.getField().getType());
if (seg.isLastPath()) {
return builder.build();
} else {
@@ -157,7 +157,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
if (child.isArray() && child.isLastPath()) {
builder.remainder(child);
builder.withIndex();
- builder.finalType(v.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build());
+ builder.finalType(vector.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build());
return builder.build();
} else {
return null;
@@ -167,4 +167,10 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
}
}
+ public void transfer(VectorWrapper<?> destination) {
+ Preconditions.checkArgument(destination instanceof SimpleVectorWrapper);
+ Preconditions.checkArgument(getField().getType().equals(destination.getField().getType()));
+ vector.makeTransferPair(((SimpleVectorWrapper)destination).vector).transfer();
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 57278247f..ef22d521d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
@@ -80,6 +81,25 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
add(vv, releasable);
}
+ /**
+ * Transfer vectors from containerIn to this.
+ */
+ void transferIn(VectorContainer containerIn) {
+ Preconditions.checkArgument(this.wrappers.size() == containerIn.wrappers.size());
+ for (int i = 0; i < this.wrappers.size(); ++i) {
+ containerIn.wrappers.get(i).transfer(this.wrappers.get(i));
+ }
+ }
+
+ /**
+ * Transfer vectors from this to containerOut
+ */
+ void transferOut(VectorContainer containerOut) {
+ Preconditions.checkArgument(this.wrappers.size() == containerOut.wrappers.size());
+ for (int i = 0; i < this.wrappers.size(); ++i) {
+ this.wrappers.get(i).transfer(containerOut.wrappers.get(i));
+ }
+ }
public <T extends ValueVector> T addOrGet(MaterializedField field) {
return addOrGet(field, null);
@@ -97,6 +117,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
return (T) newVector;
}
} else {
+
vector = TypeHelper.getNewVector(field, this.oContext.getAllocator(), callBack);
add(vector);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index dc8ffe582..5250f98a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -33,6 +33,7 @@ public interface VectorWrapper<T extends ValueVector> {
public void clear();
public VectorWrapper<T> cloneAndTransfer();
public VectorWrapper<?> getChildWrapper(int[] ids);
+ public void transfer(VectorWrapper<?> destination);
/**
* Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper. If so, return a TypedFieldId associated with this path.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index 55173f93a..40ee63d42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -40,21 +40,22 @@ public class BatchPrinter {
numBatches = vw.getValueVectors().length;
}
int width = columns.size();
- for (int j = 0; j < sv4.getCount(); j++) {
- for (VectorWrapper vw : batch) {
- Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(j & 65535);
- if (o instanceof byte[]) {
- String value = new String((byte[]) o);
- System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
- } else {
- String value = o.toString();
- System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
- }
- }
- System.out.printf("|\n");
+ for (int j = 0; j < sv4.getCount(); j++) {
+ for (VectorWrapper vw : batch) {
+ Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
+ if (o instanceof byte[]) {
+ String value = new String((byte[]) o);
+ System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
+ } else {
+ String value = o.toString();
+ System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
}
+ }
System.out.printf("|\n");
+ }
+ System.out.printf("|\n");
}
+
public static void printBatch(VectorAccessible batch) {
List<String> columns = Lists.newArrayList();
List<ValueVector> vectors = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 4b155ee2b..0b52b9e36 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -184,10 +184,10 @@ public class TestExampleQueries extends BaseTestQuery {
@Test
public void testPushExpInJoinConditionInnerJoin() throws Exception {
test("select a.n_nationkey from cp.`tpch/nation.parquet` a join cp.`tpch/region.parquet` b " + "" +
- " on a.n_regionkey + 100 = b.r_regionkey + 200" + // expressions in both sides of equal join filter
- " and (substr(a.n_name,1,3)= 'L1' or substr(a.n_name,2,2) = 'L2') " + // left filter
- " and (substr(b.r_name,1,3)= 'R1' or substr(b.r_name,2,2) = 'R2') " + // right filter
- " and (substr(a.n_name,2,3)= 'L3' or substr(b.r_name,3,2) = 'R3');"); // non-equal join filter
+ " on a.n_regionkey + 100 = b.r_regionkey + 200" + // expressions in both sides of equal join filter
+ " and (substr(a.n_name,1,3)= 'L1' or substr(a.n_name,2,2) = 'L2') " + // left filter
+ " and (substr(b.r_name,1,3)= 'R1' or substr(b.r_name,2,2) = 'R2') " + // right filter
+ " and (substr(a.n_name,2,3)= 'L3' or substr(b.r_name,3,2) = 'R3');"); // non-equal join filter
}
@Test
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
index 54539fd2e..ec7411d10 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestMergeJoinAdvanced.java
@@ -23,8 +23,15 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+
public class TestMergeJoinAdvanced extends BaseTestQuery {
// Have to disable hash join to test merge join in this class
@@ -96,4 +103,99 @@ public class TestMergeJoinAdvanced extends BaseTestQuery {
setSessionOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY, String.valueOf(ExecConstants.MAX_WIDTH_PER_NODE.getDefault().num_val));
}
}
+
+ private static void generateData(final BufferedWriter leftWriter, final BufferedWriter rightWriter,
+ final long left, final long right) throws IOException {
+ for (int i=0; i < left; ++i) {
+ leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10000, i));
+ }
+ leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10001, 10001));
+ leftWriter.write(String.format("{ \"k\" : %d , \"v\": %d }", 10002, 10002));
+
+ for (int i=0; i < right; ++i) {
+ rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10000, i));
+ }
+ rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10004, 10004));
+ rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10005, 10005));
+ rightWriter.write(String.format("{ \"k1\" : %d , \"v1\": %d }", 10006, 10006));
+
+ leftWriter.close();
+ rightWriter.close();
+ }
+
+ private static void testMultipleBatchJoin(final long right, final long left,
+ final String joinType, final long expected) throws Exception {
+ final String leftSide = BaseTestQuery.getTempDir("merge-join-left.json");
+ final String rightSide = BaseTestQuery.getTempDir("merge-join-right.json");
+ final BufferedWriter leftWriter = new BufferedWriter(new FileWriter(new File(leftSide)));
+ final BufferedWriter rightWriter = new BufferedWriter(new FileWriter(new File(rightSide)));
+ generateData(leftWriter, rightWriter, left, right);
+ final String query1 = String.format("select count(*) c1 from dfs_test.`%s` L %s join dfs_test.`%s` R on L.k=R.k1",
+ leftSide, joinType, rightSide);
+ testBuilder()
+ .sqlQuery(query1)
+ .optionSettingQueriesForTestQuery("alter session set `planner.enable_hashjoin` = false")
+ .unOrdered()
+ .baselineColumns("c1")
+ .baselineValues(expected)
+ .go();
+ }
+
+ @Test
+ public void testMergeInnerJoinLargeRight() throws Exception {
+ testMultipleBatchJoin(1000l, 5000l, "inner", 5000l * 1000l);
+ }
+
+ @Test
+ public void testMergeLeftJoinLargeRight() throws Exception {
+ testMultipleBatchJoin(1000l, 5000l, "left", 5000l * 1000l +2l);
+ }
+
+ @Test
+ public void testMergeRightJoinLargeRight() throws Exception {
+ testMultipleBatchJoin(1000l, 5000l, "right", 5000l*1000l +3l);
+ }
+
+ @Test
+ public void testMergeInnerJoinLargeLeft() throws Exception {
+ testMultipleBatchJoin(5000l, 1000l, "inner", 5000l*1000l);
+ }
+
+ @Test
+ public void testMergeLeftJoinLargeLeft() throws Exception {
+ testMultipleBatchJoin(5000l, 1000l, "left", 5000l*1000l + 2l);
+ }
+
+ @Test
+ public void testMergeRightJoinLargeLeft() throws Exception {
+ testMultipleBatchJoin(5000l, 1000l, "right", 5000l*1000l + 3l);
+ }
+
+ // Following tests can take some time.
+ @Test
+ @Ignore
+ public void testMergeInnerJoinRandomized() throws Exception {
+ final Random r = new Random();
+ final long right = r.nextInt(10001) + 1l;
+ final long left = r.nextInt(10001) + 1l;
+ testMultipleBatchJoin(left, right, "inner", left*right);
+ }
+
+ @Test
+ @Ignore
+ public void testMergeLeftJoinRandomized() throws Exception {
+ final Random r = new Random();
+ final long right = r.nextInt(10001) + 1l;
+ final long left = r.nextInt(10001) + 1l;
+ testMultipleBatchJoin(left, right, "left", left*right + 2l);
+ }
+
+ @Test
+ @Ignore
+ public void testMergeRightJoinRandomized() throws Exception {
+ final Random r = new Random();
+ final long right = r.nextInt(10001) + 1l;
+ final long left = r.nextInt(10001) + 1l;
+ testMultipleBatchJoin(left, right, "right", left * right + 3l);
+ }
}