diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java | 33 |
1 files changed, 22 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 799bf7fb2..b875b66b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,7 +34,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; @@ -97,7 +96,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect @Override protected IterOutcome doWork() { int incomingRecordCount = incoming.getRecordCount(); - int copiedRecords = copier.copyRecords(0, incomingRecordCount); + int copiedRecords; + try { + copiedRecords = copier.copyRecords(0, incomingRecordCount); + } catch (SchemaChangeException e) { + throw new IllegalStateException(e); + } if (copiedRecords < incomingRecordCount) { for(VectorWrapper<?> v : container){ @@ -136,9 +140,13 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect int recordCount = incoming.getRecordCount(); int remainingRecordCount = incoming.getRecordCount() - remainderIndex; int copiedRecords; - while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) { - logger.debug("Copied zero records. Retrying"); - container.zeroVectors(); + try { + while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) { + logger.debug("Copied zero records. Retrying"); + container.zeroVectors(); + } + } catch (SchemaChangeException e) { + throw new IllegalStateException(e); } /* @@ -222,7 +230,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); for(VectorWrapper<?> vv : incoming){ - TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); + vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); } try { @@ -230,6 +238,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect CopyUtil.generateCopies(cg.getRoot(), incoming, false); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, incoming, this); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); return copier; } catch (ClassTransformationException | IOException e) { @@ -245,6 +256,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{ for(VectorWrapper<?> vv : batch){ + @SuppressWarnings("resource") ValueVector v = vv.getValueVectors()[0]; v.makeTransferPair(container.addOrGet(v.getField(), callBack)); } @@ -252,9 +264,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect try { final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry(), context.getOptions()); CopyUtil.generateCopies(cg.getRoot(), batch, true); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, batch, outgoing); - return copier; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); @@ -265,7 +279,4 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public WritableBatch getWritableBatch() { return WritableBatch.get(this); } - - - } |