diff options
author | vkorukanti <venki.korukanti@gmail.com> | 2015-03-15 13:23:03 -0700 |
---|---|---|
committer | vkorukanti <venki.korukanti@gmail.com> | 2015-03-15 13:23:03 -0700 |
commit | 34932e12619762f656c1faf3d9039888de0ad277 (patch) | |
tree | e55fe7eb6b9e1e903b27b75c62ecd8e9830fd30a /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl | |
parent | 7b4c887e852ed1fd526953e55df485a5aaae6e22 (diff) |
DRILL-2453: Handle the case where incoming has no schema in PartitionSender.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7df69a468..abf9cbcc9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; @@ -269,6 +270,13 @@ public class PartitionSenderRootExec extends BaseRootExec { } public void sendEmptyBatch(boolean isLast) { + BatchSchema schema = incoming.getSchema(); + if (schema == null) { + // If the incoming batch has no schema (possible when there are no input records), + // create an empty schema to avoid NPE. + schema = BatchSchema.newBuilder().build(); + } + FragmentHandle handle = context.getHandle(); StatusHandler statusHandler = new StatusHandler(sendCount, context); for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { @@ -280,7 +288,7 @@ public class PartitionSenderRootExec extends BaseRootExec { handle.getMinorFragmentId(), operator.getOppositeMajorFragmentId(), destination.getId(), - incoming.getSchema()); + schema); stats.startWait(); try { tunnel.sendRecordBatch(statusHandler, writableBatch); |