aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/ops
diff options
context:
space:
mode:
authorweijie.tong <weijie.tong@alipay.com>2018-05-23 10:38:40 +0800
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-08-23 11:14:19 +0300
commitb895b28182a981e5948ffa292da827cb8b2e571e (patch)
tree760295df4624106050b929ebf0ed1c2aff7e1eb6 /exec/java-exec/src/main/java/org/apache/drill/exec/ops
parent71c6c689a083e7496f06e99b4d253f11866ee741 (diff)
DRILL-6385: Support JPPD feature
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/ops')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java2
5 files changed, 41 insertions, 2 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
index cf9fcfe2c..484629b94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java
@@ -23,6 +23,8 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+
/**
* Wrapper around a {@link org.apache.drill.exec.rpc.data.DataTunnel} that tracks the status of batches sent to
@@ -44,6 +46,11 @@ public class AccountingDataTunnel {
tunnel.sendRecordBatch(statusHandler, batch);
}
+ public void sendRuntimeFilter(RuntimeFilterWritable batch) {
+ sendingAccountor.increment();
+ tunnel.sendRuntimeFilter(statusHandler, batch);
+ }
+
/**
* See {@link DataTunnel#setTestInjectionControls(ControlsInjector, ExecutionControls, Logger)}.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 593e3d396..15161b421 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.testing.ExecutionControls;
import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
/**
* Provides the resources required by a non-exchange operator to execute.
@@ -158,6 +159,18 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
@Override
void close();
+ /**
+ * Return null ,if setRuntimeFilter not being called
+ * @return
+ */
+ RuntimeFilterWritable getRuntimeFilter();
+
+ /**
+ * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
+ * @param runtimeFilter
+ */
+ public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+
interface ExecutorState {
/**
* Tells individual operations whether they should continue. In some cases, an external event (typically cancellation)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 503ebdded..8efbaca5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -67,6 +67,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
/**
* <p>
@@ -135,6 +136,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
/** Stores constants and their holders by type */
private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
+ private RuntimeFilterWritable runtimeFilterWritable;
+
/**
* Create a FragmentContext instance for non-root fragment.
*
@@ -344,6 +347,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
}
+ @Override
+ public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ this.runtimeFilterWritable = runtimeFilter;
+ }
+
+ @Override
+ public RuntimeFilterWritable getRuntimeFilter() {
+ return runtimeFilterWritable;
+ }
+
/**
* Get this fragment's allocator.
* @return the allocator
@@ -457,7 +470,9 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
for (OperatorContextImpl opContext : contexts) {
suppressingClose(opContext);
}
-
+ if (runtimeFilterWritable != null) {
+ suppressingClose(runtimeFilterWritable);
+ }
suppressingClose(bufferManager);
suppressingClose(allocator);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 07742f2e8..762614fa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -248,6 +248,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
}
+ public boolean isRuntimeFilterEnabled() {
+ return this.getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
+ }
+
public DrillOperatorTable getDrillOperatorTable() {
return table;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
index e8ce3c4de..eca4011f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SendingAccountor.java
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* TODO: Need to update to use long for number of pending messages.
*/
-class SendingAccountor {
+public class SendingAccountor {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SendingAccountor.class);
private final AtomicInteger batchesSent = new AtomicInteger(0);