aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
diff options
context:
space:
mode:
authorvkorukanti <venki.korukanti@gmail.com>2015-03-15 13:23:03 -0700
committervkorukanti <venki.korukanti@gmail.com>2015-03-15 13:23:03 -0700
commit34932e12619762f656c1faf3d9039888de0ad277 (patch)
treee55fe7eb6b9e1e903b27b75c62ecd8e9830fd30a /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
parent7b4c887e852ed1fd526953e55df485a5aaae6e22 (diff)
DRILL-2453: Handle the case where incoming has no schema in PartitionSender.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java10
1 files changed, 9 insertions, 1 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7df69a468..abf9cbcc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -269,6 +270,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
public void sendEmptyBatch(boolean isLast) {
+ BatchSchema schema = incoming.getSchema();
+ if (schema == null) {
+ // If the incoming batch has no schema (possible when there are no input records),
+ // create an empty schema to avoid NPE.
+ schema = BatchSchema.newBuilder().build();
+ }
+
FragmentHandle handle = context.getHandle();
StatusHandler statusHandler = new StatusHandler(sendCount, context);
for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
@@ -280,7 +288,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
handle.getMinorFragmentId(),
operator.getOppositeMajorFragmentId(),
destination.getId(),
- incoming.getSchema());
+ schema);
stats.startWait();
try {
tunnel.sendRecordBatch(statusHandler, writableBatch);