diff options
author | Paul Rogers <progers@cloudera.com> | 2019-01-28 22:04:31 -0800 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2019-02-08 11:13:22 +0200 |
commit | 304293a46e66ba27b6b38bbc2fef63743f78d598 (patch) | |
tree | 978c1cfc24b0753094cf91f85ff77c439e9a0c42 /exec/java-exec/src/main/java/org/apache | |
parent | 1fa12e71c45efd9d1dd8ff383588d30360753bac (diff) |
DRILL-7024: Refactor ColumnWriter to simplify type-conversion shim
DRILL-7006 added a type conversion "shim" within the row set framework. Basically, we insert a "shim" column writer that takes data in one form (String, say), and does reader-specific conversions to a target format (INT, say).
The code works fine, but the shim class ends up needing to override a bunch of methods which it then passes along to the base writer. This PR refactors the code so that the conversion shim is simpler.
closes #1633
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
9 files changed, 41 insertions, 31 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java index 33a8eeb8c..19d86453d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnBuilder.java @@ -162,7 +162,7 @@ public class ColumnBuilder { (NullableVector) vector); } else { vectorState = SimpleVectorState.vectorState(columnSchema, - colWriter.scalar(), vector); + colWriter.events(), vector); } // Create the column state which binds the vector and writer together. @@ -261,7 +261,7 @@ public class ColumnBuilder { offsetVectorState = new OffsetVectorState( (((AbstractArrayWriter) writer.array()).offsetWriter()), offsetVector, - writer.array().entry()); + writer.array().entry().events()); } else { offsetVectorState = new NullVectorState(); } @@ -397,7 +397,8 @@ public class ColumnBuilder { // Create the list vector state that tracks the list vector lifecycle. - final ListVectorState vectorState = new ListVectorState(listWriter, memberState.writer(), listVector); + final ListVectorState vectorState = new ListVectorState(listWriter, + memberState.writer().events(), listVector); // Assemble it all into a union column state. @@ -504,7 +505,7 @@ public class ColumnBuilder { // For a repeated list, we only care about final RepeatedListVectorState vectorState = new RepeatedListVectorState( - arrayWriter.array(), vector); + arrayWriter, vector); // Build the container that tracks the array contents diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java index 95f011a23..36b9db810 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ColumnState.java @@ -22,9 +22,10 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.WriterEvents; +import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener; /** * Represents the write-time state for a column including the writer and the (optional) @@ -53,13 +54,13 @@ public abstract class ColumnState { AbstractObjectWriter colWriter, VectorState vectorState) { super(loader, colWriter, vectorState); - ScalarWriter scalarWriter; + WriterEvents scalarEvents; if (colWriter.type() == ObjectType.ARRAY) { - scalarWriter = writer.array().scalar(); + scalarEvents = writer.array().entry().events(); } else { - scalarWriter = writer.scalar(); + scalarEvents = writer.events(); } - scalarWriter.bindListener(this); + scalarEvents.bindListener(this); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java index 3488e7bb7..37104f437 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/ListState.java @@ -32,12 +32,12 @@ import org.apache.drill.exec.record.metadata.VariantMetadata; import org.apache.drill.exec.record.metadata.VariantSchema; import org.apache.drill.exec.vector.accessor.ObjectWriter; import org.apache.drill.exec.vector.accessor.VariantWriter; -import org.apache.drill.exec.vector.accessor.WriterPosition; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.ListWriterImpl; import org.apache.drill.exec.vector.accessor.writer.SimpleListShim; import org.apache.drill.exec.vector.accessor.writer.UnionVectorShim; import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl; +import org.apache.drill.exec.vector.accessor.writer.WriterEvents; import org.apache.drill.exec.vector.complex.ListVector; import org.apache.drill.exec.vector.complex.UnionVector; @@ -123,7 +123,7 @@ public class ListState extends ContainerState memberVectorState = new NullVectorState(); } - public ListVectorState(ListWriterImpl writer, WriterPosition elementWriter, ListVector vector) { + public ListVectorState(ListWriterImpl writer, WriterEvents elementWriter, ListVector vector) { this.schema = writer.schema(); this.vector = vector; bitsVectorState = new IsSetVectorState(writer, vector.getBitsVector()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java index b01292074..90dc3fe58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/NullableVectorState.java @@ -38,7 +38,7 @@ public class NullableVectorState implements VectorState { this.schema = writer.schema(); this.vector = vector; - this.writer = (NullableScalarWriter) writer.scalar(); + this.writer = (NullableScalarWriter) writer.events(); bitsState = new IsSetVectorState(this.writer.bitsWriter(), vector.getBitsVector()); valuesState = SimpleVectorState.vectorState(this.writer.schema(), this.writer.baseWriter(), vector.getValuesVector()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java index ab64dded3..39d7d4423 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedListState.java @@ -87,12 +87,12 @@ public class RepeatedListState extends ContainerState implements RepeatedListWri private final RepeatedListVector vector; private final OffsetVectorState offsetsState; - public RepeatedListVectorState(ArrayWriter arrayWriter, RepeatedListVector vector) { + public RepeatedListVectorState(AbstractObjectWriter arrayWriter, RepeatedListVector vector) { this.vector = vector; - this.arrayWriter = arrayWriter; + this.arrayWriter = arrayWriter.array(); offsetsState = new OffsetVectorState( - arrayWriter, vector.getOffsetVector(), - arrayWriter.entryType() == null ? null : arrayWriter.array()); + arrayWriter.events(), vector.getOffsetVector(), + this.arrayWriter.entryType() == null ? null : arrayWriter.events()); } /** @@ -105,7 +105,7 @@ public class RepeatedListState extends ContainerState implements RepeatedListWri */ public void updateChildWriter(AbstractObjectWriter childWriter) { - offsetsState.setChildWriter(childWriter.array()); + offsetsState.setChildWriter(childWriter.events()); } @SuppressWarnings("unchecked") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java index 8dd4839ce..1d01b9d08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/RepeatedVectorState.java @@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ArrayWriter; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter; -import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl; import org.apache.drill.exec.vector.complex.RepeatedValueVector; /** @@ -47,7 +47,7 @@ public class RepeatedVectorState implements VectorState { // vector, and the scalar (value) portion of the array writer. arrayWriter = (AbstractArrayWriter) writer; - AbstractScalarWriter colWriter = (AbstractScalarWriter) writer.scalar(); + AbstractScalarWriterImpl colWriter = (AbstractScalarWriterImpl) writer.entry().events(); valuesState = SimpleVectorState.vectorState(writer.schema(), colWriter, vector.getDataVector()); // Create the offsets state with the offset vector portion of the repeated diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java index e57373caa..538435375 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/SingleVectorState.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.accessor.WriterPosition; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.OffsetVectorWriter; +import org.apache.drill.exec.vector.accessor.writer.WriterEvents; /** * Base class for a single vector. Handles the bulk of work for that vector. @@ -41,7 +42,7 @@ public abstract class SingleVectorState implements VectorState { public abstract static class SimpleVectorState extends SingleVectorState { - public SimpleVectorState(WriterPosition writer, + public SimpleVectorState(WriterEvents writer, ValueVector mainVector) { super(writer, mainVector); } @@ -74,7 +75,7 @@ public abstract class SingleVectorState implements VectorState { public static class FixedWidthVectorState extends SimpleVectorState { - public FixedWidthVectorState(WriterPosition writer, ValueVector mainVector) { + public FixedWidthVectorState(WriterEvents writer, ValueVector mainVector) { super(writer, mainVector); } @@ -87,7 +88,7 @@ public abstract class SingleVectorState implements VectorState { public static class IsSetVectorState extends FixedWidthVectorState { - public IsSetVectorState(WriterPosition writer, ValueVector mainVector) { + public IsSetVectorState(WriterEvents writer, ValueVector mainVector) { super(writer, mainVector); } @@ -112,7 +113,7 @@ public abstract class SingleVectorState implements VectorState { private final ColumnMetadata schema; - public VariableWidthVectorState(ColumnMetadata schema, WriterPosition writer, ValueVector mainVector) { + public VariableWidthVectorState(ColumnMetadata schema, WriterEvents writer, ValueVector mainVector) { super(writer, mainVector); this.schema = schema; } @@ -147,13 +148,13 @@ public abstract class SingleVectorState implements VectorState { private WriterPosition childWriter; - public OffsetVectorState(WriterPosition writer, ValueVector mainVector, + public OffsetVectorState(WriterEvents writer, ValueVector mainVector, WriterPosition childWriter) { super(writer, mainVector); this.childWriter = childWriter; } - public void setChildWriter(WriterPosition childWriter) { + public void setChildWriter(WriterEvents childWriter) { this.childWriter = childWriter; } @@ -224,11 +225,11 @@ public abstract class SingleVectorState implements VectorState { } } - protected final WriterPosition writer; + protected final WriterEvents writer; protected final ValueVector mainVector; protected ValueVector backupVector; - public SingleVectorState(WriterPosition writer, ValueVector mainVector) { + public SingleVectorState(WriterEvents writer, ValueVector mainVector) { this.writer = writer; this.mainVector = mainVector; } @@ -359,7 +360,7 @@ public abstract class SingleVectorState implements VectorState { @Override public boolean isProjected() { return true; } - public static SimpleVectorState vectorState(ColumnMetadata schema, WriterPosition writer, ValueVector mainVector) { + public static SimpleVectorState vectorState(ColumnMetadata schema, WriterEvents writer, ValueVector mainVector) { if (schema.isVariableWidth()) { return new VariableWidthVectorState(schema, writer, mainVector); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java index ff531f717..eace69ebb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/TupleState.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.record.metadata.TupleSchema; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ObjectWriter; import org.apache.drill.exec.vector.accessor.TupleWriter; -import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.AbstractObjectWriter; import org.apache.drill.exec.vector.accessor.writer.AbstractTupleWriter; @@ -92,7 +91,7 @@ import org.apache.drill.exec.vector.complex.AbstractMapVector; */ public abstract class TupleState extends ContainerState - implements TupleWriterListener { + implements AbstractTupleWriter.TupleWriterListener { /** * Represents a map column (either single or repeated). Includes maps that diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java index a4946a96c..b59c17eeb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.work.filter; import org.apache.commons.collections.CollectionUtils; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ops.SendingAccountor; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; @@ -172,9 +173,14 @@ public class RuntimeFilterRouter { } } + @SuppressWarnings("unchecked") private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) { targetOpVisitor.setCurrentFragment(wrapper.getNode()); - wrapper.getNode().getRoot().accept(targetOpVisitor, null); + try { + wrapper.getNode().getRoot().accept(targetOpVisitor, null); + } catch (Throwable e) { + throw UserException.systemError(e).build(); + } boolean contain = targetOpVisitor.isContain(); if (contain) { return wrapper; @@ -233,6 +239,7 @@ public class RuntimeFilterRouter { return null; } + @Override public boolean isContain() { return contain; } @@ -284,6 +291,7 @@ public class RuntimeFilterRouter { return null; } + @Override public boolean isContain() { return contain; } |