diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 11 |
1 files changed, 3 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7e3f4b20e..cf7ba1610 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; @@ -101,7 +100,7 @@ public class PartitionSenderRootExec extends BaseRootExec { public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - super(context, new OperatorContext(operator, context, null, false), operator); + super(context, context.newOperatorContext(operator, null, false), operator); this.incoming = incoming; this.operator = operator; this.context = context; @@ -141,8 +140,6 @@ public class PartitionSenderRootExec extends BaseRootExec { public boolean innerNext() { if (!ok) { - stop(); - return false; } @@ -322,17 +319,15 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public void stop() { + public void close() throws Exception { logger.debug("Partition sender stopping."); - super.stop(); + super.close(); ok = false; if (partitioner != null) { updateAggregateStats(); partitioner.clear(); } - oContext.close(); - incoming.cleanup(); } public void sendEmptyBatch(boolean isLast) { |