aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java35
1 files changed, 25 insertions, 10 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index a662adc6b..824060194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -17,9 +17,11 @@
*/
package org.apache.drill.exec.planner.fragment;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -82,18 +84,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
return sendingExchange;
}
-// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) {
-// return visitor.visit(this, extra);
-// }
-
public class ExchangeFragmentPair {
private Exchange exchange;
- private Fragment node;
+ private Fragment fragmentXchgTo;
- public ExchangeFragmentPair(Exchange exchange, Fragment node) {
+ public ExchangeFragmentPair(Exchange exchange, Fragment fragXchgTo) {
super();
this.exchange = exchange;
- this.node = node;
+ this.fragmentXchgTo = fragXchgTo;
}
public Exchange getExchange() {
@@ -101,7 +99,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
}
public Fragment getNode() {
- return node;
+ return fragmentXchgTo;
}
@Override
@@ -109,7 +107,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
final int prime = 31;
int result = 1;
result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
- result = prime * result + ((node == null) ? 0 : node.hashCode());
+ result = prime * result + ((fragmentXchgTo == null) ? 0 : fragmentXchgTo.hashCode());
return result;
}
@@ -167,10 +165,27 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
return true;
}
+ public List<PhysicalOperator> getBufferedOperators() {
+ List<PhysicalOperator> bufferedOps = new ArrayList<>();
+ root.accept(new BufferedOpFinder(), bufferedOps);
+ return bufferedOps;
+ }
+
+ protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
+ @Override
+ public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
+ throws RuntimeException {
+ if (op.isBufferedOperator(null)) {
+ value.add(op);
+ }
+ visitChildren(op, value);
+ return null;
+ }
+ }
+
@Override
public String toString() {
return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs="
+ receivingExchangePairs + "]";
}
-
}