aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache
diff options
context:
space:
mode:
authorPaul Rogers <progers@cloudera.com>2019-01-28 22:04:31 -0800
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2019-02-08 11:13:22 +0200
commit304293a46e66ba27b6b38bbc2fef63743f78d598 (patch)
tree978c1cfc24b0753094cf91f85ff77c439e9a0c42 /exec/java-exec/src/main/java/org/apache
parent1fa12e71c45efd9d1dd8ff383588d30360753bac (diff)
DRILL-7024: Refactor ColumnWriter to simplify type-conversion shim
DRILL-7006 added a type conversion "shim" within the row set framework. Basically, we insert a "shim" column writer that takes data in one form (String, say), and does reader-specific conversions to a target format (INT, say). The code works fine, but the shim class ends up needing to override a bunch of methods which it then passes along to the base writer. This PR refactors the code so that the conversion shim is simpler. closes #1633
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java10
9 files changed, 41 insertions, 31 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
index 33a8eeb8c..19d86453d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java
@@ -162,7 +162,7 @@ public class ColumnBuilder {
(NullableVector) vector);
} else {
vectorState = SimpleVectorState.vectorState(columnSchema,
- colWriter.scalar(), vector);
+ colWriter.events(), vector);
}
// Create the column state which binds the vector and writer together.
@@ -261,7 +261,7 @@ public class ColumnBuilder {
offsetVectorState = new OffsetVectorState(
(((AbstractArrayWriter) writer.array()).offsetWriter()),
offsetVector,
- writer.array().entry());
+ writer.array().entry().events());
} else {
offsetVectorState = new NullVectorState();
}
@@ -397,7 +397,8 @@ public class ColumnBuilder {
// Create the list vector state that tracks the list vector lifecycle.
- final ListVectorState vectorState = new ListVectorState(listWriter, memberState.writer(), listVector);
+ final ListVectorState vectorState = new ListVectorState(listWriter,
+ memberState.writer().events(), listVector);
// Assemble it all into a union column state.
@@ -504,7 +505,7 @@ public class ColumnBuilder {
// For a repeated list, we only care about
final RepeatedListVectorState vectorState = new RepeatedListVectorState(
- arrayWriter.array(), vector);
+ arrayWriter, vector);
// Build the container that tracks the array contents
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
index 95f011a23..36b9db810 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java
@@ -22,9 +22,10 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
+import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
/**
* Represents the write-time state for a column including the writer and the (optional)
@@ -53,13 +54,13 @@ public abstract class ColumnState {
AbstractObjectWriter colWriter,
VectorState vectorState) {
super(loader, colWriter, vectorState);
- ScalarWriter scalarWriter;
+ WriterEvents scalarEvents;
if (colWriter.type() == ObjectType.ARRAY) {
- scalarWriter = writer.array().scalar();
+ scalarEvents = writer.array().entry().events();
} else {
- scalarWriter = writer.scalar();
+ scalarEvents = writer.events();
}
- scalarWriter.bindListener(this);
+ scalarEvents.bindListener(this);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
index 3488e7bb7..37104f437 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java
@@ -32,12 +32,12 @@ import org.apache.drill.exec.record.metadata.VariantMetadata;
import org.apache.drill.exec.record.metadata.VariantSchema;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.VariantWriter;
-import org.apache.drill.exec.vector.accessor.WriterPosition;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl;
import org.apache.drill.exec.vector.accessor.writer.SimpleListShim;
import org.apache.drill.exec.vector.accessor.writer.UnionVectorShim;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl;
+import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
import org.apache.drill.exec.vector.complex.ListVector;
import org.apache.drill.exec.vector.complex.UnionVector;
@@ -123,7 +123,7 @@ public class ListState extends ContainerState
memberVectorState = new NullVectorState();
}
- public ListVectorState(ListWriterImpl writer, WriterPosition elementWriter, ListVector vector) {
+ public ListVectorState(ListWriterImpl writer, WriterEvents elementWriter, ListVector vector) {
this.schema = writer.schema();
this.vector = vector;
bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
index b01292074..90dc3fe58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java
@@ -38,7 +38,7 @@ public class NullableVectorState implements VectorState {
this.schema = writer.schema();
this.vector = vector;
- this.writer = (NullableScalarWriter) writer.scalar();
+ this.writer = (NullableScalarWriter) writer.events();
bitsState = new IsSetVectorState(this.writer.bitsWriter(), vector.getBitsVector());
valuesState = SimpleVectorState.vectorState(this.writer.schema(),
this.writer.baseWriter(), vector.getValuesVector());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java
index ab64dded3..39d7d4423 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java
@@ -87,12 +87,12 @@ public class RepeatedListState extends ContainerState implements RepeatedListWri
private final RepeatedListVector vector;
private final OffsetVectorState offsetsState;
- public RepeatedListVectorState(ArrayWriter arrayWriter, RepeatedListVector vector) {
+ public RepeatedListVectorState(AbstractObjectWriter arrayWriter, RepeatedListVector vector) {
this.vector = vector;
- this.arrayWriter = arrayWriter;
+ this.arrayWriter = arrayWriter.array();
offsetsState = new OffsetVectorState(
- arrayWriter, vector.getOffsetVector(),
- arrayWriter.entryType() == null ? null : arrayWriter.array());
+ arrayWriter.events(), vector.getOffsetVector(),
+ this.arrayWriter.entryType() == null ? null : arrayWriter.events());
}
/**
@@ -105,7 +105,7 @@ public class RepeatedListState extends ContainerState implements RepeatedListWri
*/
public void updateChildWriter(AbstractObjectWriter childWriter) {
- offsetsState.setChildWriter(childWriter.array());
+ offsetsState.setChildWriter(childWriter.events());
}
@SuppressWarnings("unchecked")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
index 8dd4839ce..1d01b9d08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
/**
@@ -47,7 +47,7 @@ public class RepeatedVectorState implements VectorState {
// vector, and the scalar (value) portion of the array writer.
arrayWriter = (AbstractArrayWriter) writer;
- AbstractScalarWriter colWriter = (AbstractScalarWriter) writer.scalar();
+ AbstractScalarWriterImpl colWriter = (AbstractScalarWriterImpl) writer.entry().events();
valuesState = SimpleVectorState.vectorState(writer.schema(), colWriter, vector.getDataVector());
// Create the offsets state with the offset vector portion of the repeated
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
index e57373caa..538435375 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.accessor.WriterPosition;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter;
+import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
/**
* Base class for a single vector. Handles the bulk of work for that vector.
@@ -41,7 +42,7 @@ public abstract class SingleVectorState implements VectorState {
public abstract static class SimpleVectorState extends SingleVectorState {
- public SimpleVectorState(WriterPosition writer,
+ public SimpleVectorState(WriterEvents writer,
ValueVector mainVector) {
super(writer, mainVector);
}
@@ -74,7 +75,7 @@ public abstract class SingleVectorState implements VectorState {
public static class FixedWidthVectorState extends SimpleVectorState {
- public FixedWidthVectorState(WriterPosition writer, ValueVector mainVector) {
+ public FixedWidthVectorState(WriterEvents writer, ValueVector mainVector) {
super(writer, mainVector);
}
@@ -87,7 +88,7 @@ public abstract class SingleVectorState implements VectorState {
public static class IsSetVectorState extends FixedWidthVectorState {
- public IsSetVectorState(WriterPosition writer, ValueVector mainVector) {
+ public IsSetVectorState(WriterEvents writer, ValueVector mainVector) {
super(writer, mainVector);
}
@@ -112,7 +113,7 @@ public abstract class SingleVectorState implements VectorState {
private final ColumnMetadata schema;
- public VariableWidthVectorState(ColumnMetadata schema, WriterPosition writer, ValueVector mainVector) {
+ public VariableWidthVectorState(ColumnMetadata schema, WriterEvents writer, ValueVector mainVector) {
super(writer, mainVector);
this.schema = schema;
}
@@ -147,13 +148,13 @@ public abstract class SingleVectorState implements VectorState {
private WriterPosition childWriter;
- public OffsetVectorState(WriterPosition writer, ValueVector mainVector,
+ public OffsetVectorState(WriterEvents writer, ValueVector mainVector,
WriterPosition childWriter) {
super(writer, mainVector);
this.childWriter = childWriter;
}
- public void setChildWriter(WriterPosition childWriter) {
+ public void setChildWriter(WriterEvents childWriter) {
this.childWriter = childWriter;
}
@@ -224,11 +225,11 @@ public abstract class SingleVectorState implements VectorState {
}
}
- protected final WriterPosition writer;
+ protected final WriterEvents writer;
protected final ValueVector mainVector;
protected ValueVector backupVector;
- public SingleVectorState(WriterPosition writer, ValueVector mainVector) {
+ public SingleVectorState(WriterEvents writer, ValueVector mainVector) {
this.writer = writer;
this.mainVector = mainVector;
}
@@ -359,7 +360,7 @@ public abstract class SingleVectorState implements VectorState {
@Override
public boolean isProjected() { return true; }
- public static SimpleVectorState vectorState(ColumnMetadata schema, WriterPosition writer, ValueVector mainVector) {
+ public static SimpleVectorState vectorState(ColumnMetadata schema, WriterEvents writer, ValueVector mainVector) {
if (schema.isVariableWidth()) {
return new VariableWidthVectorState(schema, writer, mainVector);
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
index ff531f717..eace69ebb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter;
@@ -92,7 +91,7 @@ import org.apache.drill.exec.vector.complex.AbstractMapVector;
*/
public abstract class TupleState extends ContainerState
- implements TupleWriterListener {
+ implements AbstractTupleWriter.TupleWriterListener {
/**
* Represents a map column (either single or repeated). Includes maps that
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index a4946a96c..b59c17eeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.work.filter;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.SendingAccountor;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
@@ -172,9 +173,14 @@ public class RuntimeFilterRouter {
}
}
+ @SuppressWarnings("unchecked")
private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) {
targetOpVisitor.setCurrentFragment(wrapper.getNode());
- wrapper.getNode().getRoot().accept(targetOpVisitor, null);
+ try {
+ wrapper.getNode().getRoot().accept(targetOpVisitor, null);
+ } catch (Throwable e) {
+ throw UserException.systemError(e).build();
+ }
boolean contain = targetOpVisitor.isContain();
if (contain) {
return wrapper;
@@ -233,6 +239,7 @@ public class RuntimeFilterRouter {
return null;
}
+ @Override
public boolean isContain() {
return contain;
}
@@ -284,6 +291,7 @@ public class RuntimeFilterRouter {
return null;
}
+ @Override
public boolean isContain() {
return contain;
}