aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java33
1 files changed, 22 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 799bf7fb2..b875b66b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,7 +34,6 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
@@ -97,7 +96,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
@Override
protected IterOutcome doWork() {
int incomingRecordCount = incoming.getRecordCount();
- int copiedRecords = copier.copyRecords(0, incomingRecordCount);
+ int copiedRecords;
+ try {
+ copiedRecords = copier.copyRecords(0, incomingRecordCount);
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException(e);
+ }
if (copiedRecords < incomingRecordCount) {
for(VectorWrapper<?> v : container){
@@ -136,9 +140,13 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
int recordCount = incoming.getRecordCount();
int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
int copiedRecords;
- while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) {
- logger.debug("Copied zero records. Retrying");
- container.zeroVectors();
+ try {
+ while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) {
+ logger.debug("Copied zero records. Retrying");
+ container.zeroVectors();
+ }
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException(e);
}
/*
@@ -222,7 +230,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
for(VectorWrapper<?> vv : incoming){
- TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
+ vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
}
try {
@@ -230,6 +238,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
CopyUtil.generateCopies(cg.getRoot(), incoming, false);
Copier copier = context.getImplementationClass(cg);
copier.setupRemover(context, incoming, this);
+ cg.plainJavaCapable(true);
+ // Uncomment out this line to debug the generated code.
+// cg.saveCodeForDebugging(true);
return copier;
} catch (ClassTransformationException | IOException e) {
@@ -245,6 +256,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{
for(VectorWrapper<?> vv : batch){
+ @SuppressWarnings("resource")
ValueVector v = vv.getValueVectors()[0];
v.makeTransferPair(container.addOrGet(v.getField(), callBack));
}
@@ -252,9 +264,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
try {
final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry(), context.getOptions());
CopyUtil.generateCopies(cg.getRoot(), batch, true);
+ cg.plainJavaCapable(true);
+ // Uncomment out this line to debug the generated code.
+// cg.saveCodeForDebugging(true);
Copier copier = context.getImplementationClass(cg);
copier.setupRemover(context, batch, outgoing);
-
return copier;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
@@ -265,7 +279,4 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
-
-
-
}