diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7df69a468..abf9cbcc9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; @@ -269,6 +270,13 @@ public class PartitionSenderRootExec extends BaseRootExec { } public void sendEmptyBatch(boolean isLast) { + BatchSchema schema = incoming.getSchema(); + if (schema == null) { + // If the incoming batch has no schema (possible when there are no input records), + // create an empty schema to avoid NPE. + schema = BatchSchema.newBuilder().build(); + } + FragmentHandle handle = context.getHandle(); StatusHandler statusHandler = new StatusHandler(sendCount, context); for (MinorFragmentEndpoint destination : popConfig.getDestinations()) { @@ -280,7 +288,7 @@ public class PartitionSenderRootExec extends BaseRootExec { handle.getMinorFragmentId(), operator.getOppositeMajorFragmentId(), destination.getId(), - incoming.getSchema()); + schema); stats.startWait(); try { tunnel.sendRecordBatch(statusHandler, writableBatch); |