diff options
author | weijie.tong <weijie.tong@alipay.com> | 2018-05-23 10:38:40 +0800 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-08-23 11:14:19 +0300 |
commit | b895b28182a981e5948ffa292da827cb8b2e571e (patch) | |
tree | 760295df4624106050b929ebf0ed1c2aff7e1eb6 /exec/java-exec/src/main/java/org/apache/drill/exec/ops | |
parent | 71c6c689a083e7496f06e99b4d253f11866ee741 (diff) |
DRILL-6385: Support JPPD feature
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/ops')
5 files changed, 41 insertions, 2 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java index cf9fcfe2c..484629b94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java @@ -23,6 +23,8 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ExecutionControls; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; + /** * Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status of batches sent to @@ -44,6 +46,11 @@ public class AccountingDataTunnel { tunnel.sendRecordBatch(statusHandler, batch); } + public void sendRuntimeFilter(RuntimeFilterWritable batch) { + sendingAccountor.increment(); + tunnel.sendRuntimeFilter(statusHandler, batch); + } + /** * See {@link DataTunnel#setTestInjectionControls(ControlsInjector, ExecutionControls, Logger)}. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 593e3d396..15161b421 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.testing.ExecutionControls; import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; /** * Provides the resources required by a non-exchange operator to execute. @@ -158,6 +159,18 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable { @Override void close(); + /** + * Return null ,if setRuntimeFilter not being called + * @return + */ + RuntimeFilterWritable getRuntimeFilter(); + + /** + * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment + * @param runtimeFilter + */ + public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter); + interface ExecutorState { /** * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index 503ebdded..8efbaca5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -67,6 +67,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.work.filter.RuntimeFilterWritable; /** * <p> @@ -135,6 +136,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor /** Stores constants and their holders by type */ private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; + private RuntimeFilterWritable runtimeFilterWritable; + /** * Create a FragmentContext instance for non-root fragment. * @@ -344,6 +347,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED); } + @Override + public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) { + this.runtimeFilterWritable = runtimeFilter; + } + + @Override + public RuntimeFilterWritable getRuntimeFilter() { + return runtimeFilterWritable; + } + /** * Get this fragment's allocator. * @return the allocator @@ -457,7 +470,9 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } - + if (runtimeFilterWritable != null) { + suppressingClose(runtimeFilterWritable); + } suppressingClose(bufferManager); suppressingClose(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 07742f2e8..762614fa5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -248,6 +248,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED); } + public boolean isRuntimeFilterEnabled() { + return this.getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val; + } + public DrillOperatorTable getDrillOperatorTable() { return table; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java index e8ce3c4de..eca4011f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; * * TODO: Need to update to use long for number of pending messages. */ -class SendingAccountor { +public class SendingAccountor { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class); private final AtomicInteger batchesSent = new AtomicInteger(0); |