diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index a662adc6b..824060194 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -17,9 +17,11 @@ */ package org.apache.drill.exec.planner.fragment; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.work.foreman.ForemanSetupException; @@ -82,18 +84,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { return sendingExchange; } -// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) { -// return visitor.visit(this, extra); -// } - public class ExchangeFragmentPair { private Exchange exchange; - private Fragment node; + private Fragment fragmentXchgTo; - public ExchangeFragmentPair(Exchange exchange, Fragment node) { + public ExchangeFragmentPair(Exchange exchange, Fragment fragXchgTo) { super(); this.exchange = exchange; - this.node = node; + this.fragmentXchgTo = fragXchgTo; } public Exchange getExchange() { @@ -101,7 +99,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { } public Fragment getNode() { - return node; + return fragmentXchgTo; } @Override @@ -109,7 +107,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { final int prime = 31; int result = 1; result = prime * result + ((exchange == null) ? 0 : exchange.hashCode()); - result = prime * result + ((node == null) ? 0 : node.hashCode()); + result = prime * result + ((fragmentXchgTo == null) ? 0 : fragmentXchgTo.hashCode()); return result; } @@ -167,10 +165,27 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { return true; } + public List<PhysicalOperator> getBufferedOperators() { + List<PhysicalOperator> bufferedOps = new ArrayList<>(); + root.accept(new BufferedOpFinder(), bufferedOps); + return bufferedOps; + } + + protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> { + @Override + public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value) + throws RuntimeException { + if (op.isBufferedOperator(null)) { + value.add(op); + } + visitChildren(op, value); + return null; + } + } + @Override public String toString() { return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs=" + receivingExchangePairs + "]"; } - } |