aboutsummaryrefslogtreecommitdiff
path: root/exec/vector/src/main/java/org/apache/drill
diff options
context:
space:
mode:
Diffstat (limited to 'exec/vector/src/main/java/org/apache/drill')
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java3
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java34
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java9
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java38
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java19
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java13
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java37
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java132
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java142
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java39
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java (renamed from exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConvertor.java)88
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java2
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java2
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java69
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java11
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java6
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java9
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java23
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java36
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java19
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java54
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java4
22 files changed, 386 insertions, 403 deletions
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java
index 02efd6def..691dcf168 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.vector.accessor;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.accessor.writer.ConcreteWriter;
/**
* Create a column type converter for the given column and base writer.
@@ -36,5 +35,5 @@ public interface ColumnConversionFactory {
* @return a new scalar writer to insert between the client and
* the base vector
*/
- ConcreteWriter newWriter(ColumnMetadata colDefn, ConcreteWriter baseWriter);
+ ScalarWriter newWriter(ColumnMetadata colDefn, ScalarWriter baseWriter);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
index 5c2fde1b6..5117782dd 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java
@@ -18,9 +18,6 @@
package org.apache.drill.exec.vector.accessor;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener;
-import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener;
-import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener;
/**
* Generic information about a column writer including:
@@ -33,36 +30,7 @@ import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener
* testing.</li>
*/
-public interface ColumnWriter extends WriterPosition {
-
- interface TupleListenable {
-
- /**
- * Bind a listener to the underlying map or map array column. Not valid if the
- * underlying writer is a scalar or scalar array.
- *
- * @param listener
- * the tuple listener to bind
- */
-
- void bindListener(TupleWriterListener listener);
- }
-
- interface ScalarListenable {
- /**
- * Bind a listener to the underlying scalar column, or array of scalar
- * columns. Not valid if the underlying writer is a map or array of maps.
- *
- * @param listener
- * the column listener to bind
- */
-
- void bindListener(ColumnWriterListener listener);
- }
-
- interface VariantListenable {
- void bindListener(VariantWriterListener listener);
- }
+public interface ColumnWriter {
/**
* Return the object (structure) type of this writer.
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
index af7df6e88..fffd4e588 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.vector.accessor;
+import org.apache.drill.exec.vector.accessor.writer.WriterEvents;
+
/**
* Represents a column within a tuple. A column can be an array, a scalar or a
* tuple. Each has an associated column metadata (schema) and a writer. The
@@ -49,4 +51,11 @@ public interface ObjectWriter extends ColumnWriter {
ArrayWriter array();
VariantWriter variant();
+
+ /**
+ * The internal state behind this writer. To be used only by the
+ * implementation, not by the client.
+ */
+
+ WriterEvents events();
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
index 87c798890..db86570d1 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
@@ -21,8 +21,6 @@ import java.math.BigDecimal;
import org.joda.time.Period;
-import org.apache.drill.exec.vector.accessor.ColumnWriter.ScalarListenable;
-
/**
* Represents a scalar value: a required column, a nullable column,
* or one element within an array of scalars.
@@ -44,41 +42,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriter.ScalarListenable;
* {@see ScalarElementReader}
*/
-public interface ScalarWriter extends ColumnWriter, ScalarListenable {
-
- /**
- * Listener (callback) for vector overflow events. To be optionally
- * implemented and bound by the client code of the writer. If no
- * listener is bound, and a vector overflows, then an exception is
- * thrown.
- */
-
- public interface ColumnWriterListener {
-
- /**
- * Alert the listener that a vector has overflowed. Upon return,
- * all writers must have a new set of buffers available, ready
- * to accept the in-flight value that triggered the overflow.
- *
- * @param writer the writer that triggered the overflow
- */
-
- void overflowed(ScalarWriter writer);
-
- /**
- * A writer wants to expand its vector. Allows the listener to
- * either allow the growth, or trigger and overflow to limit
- * batch size.
- *
- * @param writer the writer that wishes to grow its vector
- * @param delta the amount by which the vector is to grow
- * @return true if the vector can be grown, false if the writer
- * should instead trigger an overflow by calling
- * <tt>overflowed()</tt>
- */
-
- boolean canExpand(ScalarWriter writer, int delta);
- }
+public interface ScalarWriter extends ColumnWriter {
/**
* Describe the type of the value. This is a compression of the
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
index 6f5d17931..05ff2fc5a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java
@@ -21,7 +21,6 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.ProjectionType;
import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.vector.accessor.ColumnWriter.TupleListenable;
/**
* Writer for a tuple. A tuple is composed of columns with a fixed order and
@@ -52,23 +51,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriter.TupleListenable;
* @see {@link SingleMapWriter}, the class which this class replaces
*/
-public interface TupleWriter extends ColumnWriter, TupleListenable {
-
- /**
- * Listener (callback) to handle requests to add a new column to a tuple (row
- * or map). Implemented and bound by the client code that creates or uses the
- * tuple writer. If no listener is bound, then an attempt to add a column
- * throws an exception.
- */
-
- interface TupleWriterListener {
-
- ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column);
-
- ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
-
- ProjectionType projectionType(String columnName);
- }
+public interface TupleWriter extends ColumnWriter {
/**
* Unchecked exception thrown when attempting to access a column writer by
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index d7b97e110..585696f52 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.vector.accessor.writer;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
@@ -95,7 +96,7 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
public static class ArrayObjectWriter extends AbstractObjectWriter {
- private AbstractArrayWriter arrayWriter;
+ private final AbstractArrayWriter arrayWriter;
public ArrayObjectWriter(AbstractArrayWriter arrayWriter) {
this.arrayWriter = arrayWriter;
@@ -105,6 +106,9 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
public ArrayWriter array() { return arrayWriter; }
@Override
+ public ColumnWriter writer() { return arrayWriter; }
+
+ @Override
public WriterEvents events() { return arrayWriter; }
@Override
@@ -266,6 +270,12 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
}
@Override
+ public void bindListener(ColumnWriterListener listener) {
+ elementObjWriter.events().bindListener(listener);
+ offsetsWriter.bindListener(listener);
+ }
+
+ @Override
public ObjectType type() { return ObjectType.ARRAY; }
@Override
@@ -342,6 +352,7 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
public OffsetVectorWriter offsetWriter() { return offsetsWriter; }
+ @Override
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
index 87a181b26..527a69cdc 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java
@@ -38,9 +38,6 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
public abstract class AbstractObjectWriter implements ObjectWriter {
@Override
- public ColumnMetadata schema() { return baseWriter().schema(); }
-
- @Override
public ScalarWriter scalar() {
throw new UnsupportedOperationException();
}
@@ -60,42 +57,24 @@ public abstract class AbstractObjectWriter implements ObjectWriter {
throw new UnsupportedOperationException();
}
- public abstract WriterEvents events();
-
- public ColumnWriter baseWriter() {
- return (ColumnWriter) events();
- }
-
+ public abstract ColumnWriter writer();
@Override
- public ObjectType type() { return baseWriter().type(); }
+ public abstract WriterEvents events();
@Override
- public boolean nullable() { return baseWriter().nullable(); }
+ public ColumnMetadata schema() { return writer().schema(); }
@Override
- public void setNull() {
- baseWriter().setNull();
- }
+ public ObjectType type() { return writer().type(); }
@Override
- public void setObject(Object value) {
- baseWriter().setObject(value);
- }
-
- public abstract void dump(HierarchicalFormatter format);
+ public boolean nullable() { return writer().nullable(); }
@Override
- public int rowStartIndex() {
- return baseWriter().rowStartIndex();
- }
+ public void setNull() { writer().setNull(); }
@Override
- public int lastWriteIndex() {
- return baseWriter().lastWriteIndex();
- }
+ public void setObject(Object value) { writer().setObject(value); }
- @Override
- public int writeIndex() {
- return baseWriter().writeIndex();
- }
+ public abstract void dump(HierarchicalFormatter format);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
index 9c2e9861d..a596e14af 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java
@@ -17,109 +17,53 @@
*/
package org.apache.drill.exec.vector.accessor.writer;
-import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
-import org.apache.drill.exec.vector.accessor.ObjectType;
+import java.math.BigDecimal;
+
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
+import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener;
+import org.joda.time.Period;
/**
- * Column writer implementation that acts as the basis for the
- * generated, vector-specific implementations. All set methods
- * throw an exception; subclasses simply override the supported
- * method(s).
+ * Base class for concrete scalar column writers including actual vector
+ * writers, wrappers for nullable types, and shims used to convert types.
*/
-public abstract class AbstractScalarWriter extends ConcreteWriter {
-
- public static class ScalarObjectWriter extends AbstractObjectWriter {
-
- private ConcreteWriter scalarWriter;
-
- public ScalarObjectWriter(ConcreteWriter scalarWriter) {
- final ColumnMetadata metadata = scalarWriter.schema();
- final ColumnConversionFactory factory = metadata.typeConverter();
- if (factory == null) {
- this.scalarWriter = scalarWriter;
- } else {
- this.scalarWriter = factory.newWriter(metadata, scalarWriter);
- }
- }
-
- @Override
- public ScalarWriter scalar() { return scalarWriter; }
-
- @Override
- public WriterEvents events() { return scalarWriter; }
-
- @Override
- public void dump(HierarchicalFormatter format) {
- format
- .startObject(this)
- .attribute("scalarWriter");
- scalarWriter.dump(format);
- format.endObject();
- }
- }
-
- protected ColumnMetadata schema;
-
- /**
- * Indicates the position in the vector to write. Set via an object so that
- * all writers (within the same subtree) can agree on the write position.
- * For example, all top-level, simple columns see the same row index.
- * All columns within a repeated map see the same (inner) index, etc.
- */
-
- protected ColumnWriterIndex vectorIndex;
+public abstract class AbstractScalarWriter implements ScalarWriter {
@Override
- public ObjectType type() { return ObjectType.SCALAR; }
-
- public void bindSchema(ColumnMetadata schema) {
- this.schema = schema;
- }
-
- @Override
- public void bindIndex(ColumnWriterIndex vectorIndex) {
- this.vectorIndex = vectorIndex;
- }
-
- @Override
- public int rowStartIndex() {
- return vectorIndex.rowStartIndex();
+ public void setObject(Object value) {
+ if (value == null) {
+ setNull();
+ } else if (value instanceof Integer) {
+ setInt((Integer) value);
+ } else if (value instanceof Long) {
+ setLong((Long) value);
+ } else if (value instanceof String) {
+ setString((String) value);
+ } else if (value instanceof BigDecimal) {
+ setDecimal((BigDecimal) value);
+ } else if (value instanceof Period) {
+ setPeriod((Period) value);
+ } else if (value instanceof byte[]) {
+ final byte[] bytes = (byte[]) value;
+ setBytes(bytes, bytes.length);
+ } else if (value instanceof Byte) {
+ setInt((Byte) value);
+ } else if (value instanceof Short) {
+ setInt((Short) value);
+ } else if (value instanceof Double) {
+ setDouble((Double) value);
+ } else if (value instanceof Float) {
+ setDouble((Float) value);
+ } else {
+ throw conversionError(value.getClass().getSimpleName());
+ }
}
- @Override
- public int writeIndex() {
- return vectorIndex.vectorIndex();
+ protected UnsupportedConversionError conversionError(String javaType) {
+ return UnsupportedConversionError.writeError(schema(), javaType);
}
- @Override
- public ColumnMetadata schema() { return schema; }
-
- public abstract BaseDataValueVector vector();
-
- @Override
- public void startWrite() { }
-
- @Override
- public void startRow() { }
-
- @Override
- public void endArrayValue() { }
-
- @Override
- public void saveRow() { }
-
- @Override
- public void dump(HierarchicalFormatter format) {
- format
- .startObject(this)
- .attributeIdentity("vector", vector())
- .attribute("schema", vector().getField())
- .endObject();
- }
+ public void bindListener(ColumnWriterListener listener) { }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
new file mode 100644
index 000000000..c1306b618
--- /dev/null
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor.writer;
+
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.accessor.ColumnConversionFactory;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.ObjectType;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
+
+/**
+ * Column writer implementation that acts as the basis for the
+ * generated, vector-specific implementations. All set methods
+ * throw an exception; subclasses simply override the supported
+ * method(s).
+ */
+
+public abstract class AbstractScalarWriterImpl extends AbstractScalarWriter implements WriterEvents {
+
+ /**
+ * Wraps a scalar writer and its event handler to provide a uniform
+ * JSON-like interface for all writer types.
+ * <p>
+ * The client sees only the scalar writer API. But, internals need
+ * visibility into a rather complex set of events to orchestrate
+ * vector events: mostly sent to the writer, but some times sent
+ * from the writer, such as vector overflow. Separating the two
+ * concepts makes it easier to add type-conversion shims on top of
+ * the actual vector writer.
+ */
+ public static class ScalarObjectWriter extends AbstractObjectWriter {
+
+ private final WriterEvents writerEvents;
+ private ScalarWriter scalarWriter;
+
+ public ScalarObjectWriter(AbstractScalarWriterImpl scalarWriter) {
+ final ColumnMetadata metadata = scalarWriter.schema();
+ final ColumnConversionFactory factory = metadata.typeConverter();
+ writerEvents = scalarWriter;
+ if (factory == null) {
+ this.scalarWriter = scalarWriter;
+ } else {
+ this.scalarWriter = factory.newWriter(metadata, scalarWriter);
+ }
+ }
+
+ @Override
+ public ScalarWriter scalar() { return scalarWriter; }
+
+ @Override
+ public ColumnWriter writer() { return scalarWriter; }
+
+ @Override
+ public WriterEvents events() { return writerEvents; }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attribute("scalarWriter");
+ writerEvents.dump(format);
+ format.endObject();
+ }
+ }
+
+ protected ColumnMetadata schema;
+
+ /**
+ * Indicates the position in the vector to write. Set via an object so that
+ * all writers (within the same subtree) can agree on the write position.
+ * For example, all top-level, simple columns see the same row index.
+ * All columns within a repeated map see the same (inner) index, etc.
+ */
+
+ protected ColumnWriterIndex vectorIndex;
+
+ @Override
+ public ObjectType type() { return ObjectType.SCALAR; }
+
+ public void bindSchema(ColumnMetadata schema) {
+ this.schema = schema;
+ }
+
+ @Override
+ public void bindIndex(ColumnWriterIndex vectorIndex) {
+ this.vectorIndex = vectorIndex;
+ }
+
+ @Override
+ public int rowStartIndex() {
+ return vectorIndex.rowStartIndex();
+ }
+
+ @Override
+ public int writeIndex() {
+ return vectorIndex.vectorIndex();
+ }
+
+ @Override
+ public ColumnMetadata schema() { return schema; }
+
+ public abstract BaseDataValueVector vector();
+
+ @Override
+ public void startWrite() { }
+
+ @Override
+ public void startRow() { }
+
+ @Override
+ public void endArrayValue() { }
+
+ @Override
+ public void saveRow() { }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format
+ .startObject(this)
+ .attributeIdentity("vector", vector())
+ .attribute("schema", vector().getField())
+ .endObject();
+ }
+}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
index aee806df8..006608125 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.ProjectionType;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
@@ -104,13 +105,16 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
public static class TupleObjectWriter extends AbstractObjectWriter {
- private AbstractTupleWriter tupleWriter;
+ private final AbstractTupleWriter tupleWriter;
public TupleObjectWriter(AbstractTupleWriter tupleWriter) {
this.tupleWriter = tupleWriter;
}
@Override
+ public ColumnWriter writer() { return tupleWriter; }
+
+ @Override
public TupleWriter tuple() { return tupleWriter; }
@Override
@@ -126,11 +130,27 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
}
}
+ /**
+ * Listener (callback) to handle requests to add a new column to a tuple (row
+ * or map). Implemented and bound by the client code that creates or uses the
+ * tuple writer. If no listener is bound, then an attempt to add a column
+ * throws an exception.
+ */
+
+ public static interface TupleWriterListener {
+
+ ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column);
+
+ ObjectWriter addColumn(TupleWriter tuple, MaterializedField field);
+
+ ProjectionType projectionType(String columnName);
+ }
+
protected final TupleMetadata tupleSchema;
protected final List<AbstractObjectWriter> writers;
protected ColumnWriterIndex vectorIndex;
protected ColumnWriterIndex childIndex;
- protected TupleWriterListener listener;
+ protected AbstractTupleWriter.TupleWriterListener listener;
protected State state = State.IDLE;
protected AbstractTupleWriter(TupleMetadata schema, List<AbstractObjectWriter> writers) {
@@ -174,7 +194,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
public int addColumnWriter(AbstractObjectWriter colWriter) {
assert writers.size() == tupleSchema.size();
- int colIndex = tupleSchema.addColumn(colWriter.schema());
+ final int colIndex = tupleSchema.addColumn(colWriter.schema());
writers.add(colWriter);
colWriter.events().bindIndex(childIndex);
if (state != State.IDLE) {
@@ -197,7 +217,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
if (listener == null) {
throw new UnsupportedOperationException("addColumn");
}
- AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column);
+ final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column);
return addColumnWriter(colWriter);
}
@@ -206,7 +226,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
if (listener == null) {
throw new UnsupportedOperationException("addColumn");
}
- AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field);
+ final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field);
return addColumnWriter(colWriter);
}
@@ -317,7 +337,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
@Override
public ObjectWriter column(String colName) {
- int index = tupleSchema.index(colName);
+ final int index = tupleSchema.index(colName);
if (index == -1) {
throw new UndefinedColumnException(colName);
}
@@ -331,7 +351,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
@Override
public void setObject(Object value) {
- Object values[] = (Object[]) value;
+ final Object values[] = (Object[]) value;
if (values.length != tupleSchema.size()) {
throw new IllegalArgumentException(
String.format("Map %s has %d columns, but value array has " +
@@ -404,11 +424,14 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents {
return vectorIndex.vectorIndex();
}
- @Override
public void bindListener(TupleWriterListener listener) {
this.listener = listener;
}
+ @Override
+ public void bindListener(ColumnWriterListener listener) { }
+
+ @Override
public void dump(HierarchicalFormatter format) {
format
.startObject(this)
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConvertor.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java
index 0d0bc8803..55cd991a7 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConvertor.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java
@@ -20,11 +20,9 @@ package org.apache.drill.exec.vector.accessor.writer;
import java.math.BigDecimal;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
-import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.ValueType;
-import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.joda.time.Period;
/**
@@ -38,20 +36,12 @@ import org.joda.time.Period;
* for an int column in the case above.
*/
-// TODO: This organization works fine, but is a bit heavy-weight.
-// It may be time to think about separating the pure writer aspect of
-// a column writer from its plumbing aspects. That is, the base
-// ConcreteWriter class combines the public API (ScalarWriter) with
-// the internal implementation (WriterEvents) into a single class.
-// Might be worth using composition rather than inheritance to keep
-// these aspects distinct.
+public class AbstractWriteConverter extends AbstractScalarWriter {
-public class AbstractWriteConvertor extends ConcreteWriter {
+ private final ScalarWriter baseWriter;
- private final ConcreteWriter baseWriter;
-
- public AbstractWriteConvertor(ScalarWriter baseWriter) {
- this.baseWriter = (ConcreteWriter) baseWriter;
+ public AbstractWriteConverter(ScalarWriter baseWriter) {
+ this.baseWriter = baseWriter;
}
@Override
@@ -60,31 +50,6 @@ public class AbstractWriteConvertor extends ConcreteWriter {
}
@Override
- public int lastWriteIndex() {
- return baseWriter.lastWriteIndex();
- }
-
- @Override
- public void restartRow() {
- baseWriter.restartRow();
- }
-
- @Override
- public void endWrite() {
- baseWriter.endWrite();
- }
-
- @Override
- public void preRollover() {
- baseWriter.preRollover();
- }
-
- @Override
- public void postRollover() {
- baseWriter.postRollover();
- }
-
- @Override
public ObjectType type() {
return baseWriter.type();
}
@@ -105,46 +70,6 @@ public class AbstractWriteConvertor extends ConcreteWriter {
}
@Override
- public int rowStartIndex() {
- return baseWriter.rowStartIndex();
- }
-
- @Override
- public int writeIndex() {
- return baseWriter.writeIndex();
- }
-
- @Override
- public void bindListener(ColumnWriterListener listener) {
- baseWriter.bindListener(listener);
- }
-
- @Override
- public void bindIndex(ColumnWriterIndex index) {
- baseWriter.bindIndex(index);
- }
-
- @Override
- public void startWrite() {
- baseWriter.startWrite();
- }
-
- @Override
- public void startRow() {
- baseWriter.startRow();
- }
-
- @Override
- public void endArrayValue() {
- baseWriter.endArrayValue();
- }
-
- @Override
- public void saveRow() {
- baseWriter.saveRow();
- }
-
- @Override
public void setInt(int value) {
baseWriter.setInt(value);
}
@@ -178,9 +103,4 @@ public class AbstractWriteConvertor extends ConcreteWriter {
public void setPeriod(Period value) {
baseWriter.setPeriod(value);
}
-
- @Override
- public void dump(HierarchicalFormatter format) {
- baseWriter.dump(format);
- }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
index 7e121495f..e2e63d2ee 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
@@ -129,7 +129,7 @@ import io.netty.buffer.DrillBuf;
* a roll-over when a vector overflows.
*/
-public abstract class BaseScalarWriter extends AbstractScalarWriter {
+public abstract class BaseScalarWriter extends AbstractScalarWriterImpl {
public static final int MIN_BUFFER_SIZE = 256;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
index 6ee5bbcf7..206a0a66a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java
@@ -26,7 +26,7 @@ import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.ColumnAccessorUtils;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl.ScalarObjectWriter;
import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter;
import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java
deleted file mode 100644
index 549431f76..000000000
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.vector.accessor.writer;
-
-import java.math.BigDecimal;
-
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.drill.exec.vector.accessor.UnsupportedConversionError;
-import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
-import org.joda.time.Period;
-
-/**
- * Base class for concrete scalar column writers including actual vector
- * writers, wrappers for nullable types, and shims used to convert types.
- */
-
-public abstract class ConcreteWriter implements ScalarWriter, WriterEvents {
-
- @Override
- public void setObject(Object value) {
- if (value == null) {
- setNull();
- } else if (value instanceof Integer) {
- setInt((Integer) value);
- } else if (value instanceof Long) {
- setLong((Long) value);
- } else if (value instanceof String) {
- setString((String) value);
- } else if (value instanceof BigDecimal) {
- setDecimal((BigDecimal) value);
- } else if (value instanceof Period) {
- setPeriod((Period) value);
- } else if (value instanceof byte[]) {
- final byte[] bytes = (byte[]) value;
- setBytes(bytes, bytes.length);
- } else if (value instanceof Byte) {
- setInt((Byte) value);
- } else if (value instanceof Short) {
- setInt((Short) value);
- } else if (value instanceof Double) {
- setDouble((Double) value);
- } else if (value instanceof Float) {
- setDouble((Float) value);
- } else {
- throw conversionError(value.getClass().getSimpleName());
- }
- }
-
- protected UnsupportedConversionError conversionError(String javaType) {
- return UnsupportedConversionError.writeError(schema(), javaType);
- }
-
- abstract void dump(HierarchicalFormatter format);
-}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java
index 1167d3385..d8c8947ca 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -45,6 +46,9 @@ public class EmptyListShim implements UnionShim {
public void bindIndex(ColumnWriterIndex index) { }
@Override
+ public void bindListener(ColumnWriterListener listener) { }
+
+ @Override
public void startWrite() { }
@Override
@@ -127,6 +131,9 @@ public class EmptyListShim implements UnionShim {
public int rowStartIndex() { return 0; }
@Override
+ public int writeIndex() { return 0; }
+
+ @Override
public void addMember(AbstractObjectWriter colWriter) {
// This shim has no types. If this is called, then the shim replacement
@@ -135,4 +142,8 @@ public class EmptyListShim implements UnionShim {
throw new UnsupportedOperationException();
}
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this).endObject();
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index 75ef57fc4..e9fe11ae4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -23,12 +23,12 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.accessor.ColumnAccessors.UInt1ColumnWriter;
-import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ValueType;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.joda.time.Period;
-public class NullableScalarWriter extends AbstractScalarWriter {
+public class NullableScalarWriter extends AbstractScalarWriterImpl {
public static final class ChildIndex implements ColumnWriterIndex {
@@ -93,7 +93,7 @@ public class NullableScalarWriter extends AbstractScalarWriter {
@Override
public void bindIndex(ColumnWriterIndex index) {
writerIndex = index;
- ColumnWriterIndex childIndex = new ChildIndex(index);
+ final ColumnWriterIndex childIndex = new ChildIndex(index);
isSetWriter.bindIndex(childIndex);
baseWriter.bindIndex(childIndex);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
index f271bfab0..c761a95de 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
@@ -22,8 +22,9 @@ import java.math.BigDecimal;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl.ScalarObjectWriter;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.joda.time.Period;
@@ -60,7 +61,7 @@ public class ScalarArrayWriter extends BaseArrayWriter {
public final void nextElement() { next(); }
}
- private final ConcreteWriter elementWriter;
+ private final ScalarWriter elementWriter;
public ScalarArrayWriter(ColumnMetadata schema,
RepeatedValueVector vector, BaseScalarWriter elementWriter) {
@@ -70,7 +71,7 @@ public class ScalarArrayWriter extends BaseArrayWriter {
// Save the writer from the scalar object writer created above
// which may have wrapped the element writer in a type convertor.
- this.elementWriter = (ConcreteWriter) elementObjWriter.scalar();
+ this.elementWriter = elementObjWriter.scalar();
}
public static ArrayObjectWriter build(ColumnMetadata schema,
@@ -83,7 +84,7 @@ public class ScalarArrayWriter extends BaseArrayWriter {
public void bindIndex(ColumnWriterIndex index) {
elementIndex = new ScalarElementWriterIndex();
super.bindIndex(index);
- elementWriter.bindIndex(elementIndex);
+ elementObjWriter.events().bindIndex(elementIndex);
}
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java
index 6dd947503..66b81f976 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java
@@ -21,8 +21,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim;
-
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
@@ -56,6 +56,11 @@ public class SimpleListShim implements UnionShim {
}
@Override
+ public void bindListener(ColumnWriterListener listener) {
+ colWriter.events().bindListener(listener);
+ }
+
+ @Override
public boolean hasType(MinorType type) {
return type == colWriter.schema().type();
}
@@ -185,11 +190,23 @@ public class SimpleListShim implements UnionShim {
@Override
public int lastWriteIndex() {
- return colWriter.lastWriteIndex();
+ return events().lastWriteIndex();
}
@Override
public int rowStartIndex() {
- return colWriter.rowStartIndex();
+ return events().rowStartIndex();
+ }
+
+ @Override
+ public int writeIndex() {
+ return events().writeIndex();
+ }
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this).attribute("colWriter");
+ colWriter.dump(format);
+ format.endObject();
}
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
index e39176f89..d18b382fb 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java
@@ -20,10 +20,10 @@ package org.apache.drill.exec.vector.accessor.writer;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
import org.apache.drill.exec.vector.accessor.writer.AbstractFixedWidthWriter.BaseFixedWidthWriter;
import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim;
import org.apache.drill.exec.vector.complex.UnionVector;
@@ -42,7 +42,7 @@ public class UnionVectorShim implements UnionShim {
static class DefaultListener implements VariantWriterListener {
- private UnionVectorShim shim;
+ private final UnionVectorShim shim;
private DefaultListener(UnionVectorShim shim) {
this.shim = shim;
@@ -57,8 +57,8 @@ public class UnionVectorShim implements UnionShim {
// will already be in the variant schema by the time we add the
// writer to the variant writer in a few steps from now.
- ValueVector memberVector = shim.vector.getMember(type);
- ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type);
+ final ValueVector memberVector = shim.vector.getMember(type);
+ final ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type);
return ColumnWriterFactory.buildColumnWriter(memberSchema, memberVector);
}
@@ -100,7 +100,7 @@ public class UnionVectorShim implements UnionShim {
@Override
public void bindWriter(UnionWriterImpl writer) {
this.writer = writer;
- ColumnWriterIndex index = writer.index();
+ final ColumnWriterIndex index = writer.index();
if (index != null) {
bindIndex(index);
}
@@ -116,6 +116,12 @@ public class UnionVectorShim implements UnionShim {
}
}
+ // Unions are complex: the listener should bind to the individual components
+ // as they are created.
+
+ @Override
+ public void bindListener(ColumnWriterListener listener) { }
+
@Override
public void setNull() {
@@ -133,7 +139,7 @@ public class UnionVectorShim implements UnionShim {
@Override
public ObjectWriter member(MinorType type) {
- AbstractObjectWriter colWriter = variants[type.ordinal()];
+ final AbstractObjectWriter colWriter = variants[type.ordinal()];
if (colWriter != null) {
return colWriter;
}
@@ -150,14 +156,14 @@ public class UnionVectorShim implements UnionShim {
@Override
public AbstractObjectWriter addMember(ColumnMetadata schema) {
- AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addMember(schema);
+ final AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addMember(schema);
addMember(colWriter);
return colWriter;
}
@Override
public AbstractObjectWriter addMember(MinorType type) {
- AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addType(type);
+ final AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addType(type);
addMember(colWriter);
return colWriter;
}
@@ -189,7 +195,7 @@ public class UnionVectorShim implements UnionShim {
*/
public void addMemberWriter(AbstractObjectWriter colWriter) {
- MinorType type = colWriter.schema().type();
+ final MinorType type = colWriter.schema().type();
assert variants[type.ordinal()] == null;
variants[type.ordinal()] = colWriter;
}
@@ -283,7 +289,10 @@ public class UnionVectorShim implements UnionShim {
* @return the writer for the types vector
*/
- public ColumnWriter typeWriter() { return typeWriter; }
+ public AbstractScalarWriterImpl typeWriter() { return typeWriter; }
+
+ @Override
+ public int writeIndex() { return typeWriter.writeIndex(); }
@Override
public int lastWriteIndex() { return typeWriter.lastWriteIndex(); }
@@ -301,4 +310,11 @@ public class UnionVectorShim implements UnionShim {
public void initTypeIndex(int typeFillCount) {
((BaseFixedWidthWriter) typeWriter).setLastWriteIndex(typeFillCount);
}
+
+ @Override
+ public void dump(HierarchicalFormatter format) {
+ format.startObject(this).attribute("typeWriter");
+ typeWriter.dump(format);
+ format.endObject();
+ }
} \ No newline at end of file
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
index 928317f09..a308ceaf4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.VariantMetadata;
import org.apache.drill.exec.vector.accessor.ArrayWriter;
+import org.apache.drill.exec.vector.accessor.ColumnWriter;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ObjectType;
import org.apache.drill.exec.vector.accessor.ObjectWriter;
@@ -55,7 +56,9 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
ObjectWriter member(MinorType type);
void setType(MinorType type);
+ @Override
int lastWriteIndex();
+ @Override
int rowStartIndex();
AbstractObjectWriter addMember(ColumnMetadata colSchema);
AbstractObjectWriter addMember(MinorType type);
@@ -71,6 +74,9 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
}
@Override
+ public ColumnWriter writer() { return writer; }
+
+ @Override
public VariantWriter variant() { return writer; }
@Override
@@ -136,6 +142,12 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
this.listener = listener;
}
+ // Unions are complex: listeners should bind to the components as they
+ // are created.
+
+ @Override
+ public void bindListener(ColumnWriterListener listener) { }
+
// The following are for coordinating with the shim.
public State state() { return state; }
@@ -192,7 +204,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
// conversion. Then set the type because, if the conversion is
// done, the type vector exists only after creating the member.
- ObjectWriter writer = shim.member(type);
+ final ObjectWriter writer = shim.member(type);
setType(type);
return writer;
}
@@ -222,7 +234,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
*/
protected void addMember(AbstractObjectWriter writer) {
- MinorType type = writer.schema().type();
+ final MinorType type = writer.schema().type();
// If the metadata has not yet been added to the variant
// schema, do so now. (Unfortunately, the default listener
@@ -334,7 +346,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
// Can look for exactly one period type as is done for Object[] below
throw new IllegalArgumentException("Period is ambiguous, please use scalar(type)");
} else if (value instanceof byte[]) {
- byte[] bytes = (byte[]) value;
+ final byte[] bytes = (byte[]) value;
scalar(MinorType.VARBINARY).setBytes(bytes, bytes.length);
} else if (value instanceof Byte) {
scalar(MinorType.TINYINT).setInt((Byte) value);
@@ -361,6 +373,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents {
}
}
+ @Override
public void dump(HierarchicalFormatter format) {
// TODO Auto-generated method stub
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
index c8f9f4896..1983d1799 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
@@ -18,6 +18,9 @@
package org.apache.drill.exec.vector.accessor.writer;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.WriterPosition;
+import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
/**
* Internal interface used to control the behavior
@@ -37,7 +40,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
* calls, can change.
*/
-public interface WriterEvents {
+public interface WriterEvents extends WriterPosition {
/**
* Tracks the write state of a tuple or variant to allow applying the correct
@@ -72,6 +75,40 @@ public interface WriterEvents {
}
/**
+ * Listener (callback) for vector overflow events. To be optionally
+ * implemented and bound by the client code of the writer. If no
+ * listener is bound, and a vector overflows, then an exception is
+ * thrown.
+ */
+
+ interface ColumnWriterListener {
+
+ /**
+ * Alert the listener that a vector has overflowed. Upon return,
+ * all writers must have a new set of buffers available, ready
+ * to accept the in-flight value that triggered the overflow.
+ *
+ * @param writer the writer that triggered the overflow
+ */
+
+ void overflowed(ScalarWriter writer);
+
+ /**
+ * A writer wants to expand its vector. Allows the listener to
+ * either allow the growth, or trigger and overflow to limit
+ * batch size.
+ *
+ * @param writer the writer that wishes to grow its vector
+ * @param delta the amount by which the vector is to grow
+ * @return true if the vector can be grown, false if the writer
+ * should instead trigger an overflow by calling
+ * <tt>overflowed()</tt>
+ */
+
+ boolean canExpand(ScalarWriter writer, int delta);
+ }
+
+ /**
* Bind the writer to a writer index.
*
* @param index the writer index (top level or nested for
@@ -81,6 +118,19 @@ public interface WriterEvents {
void bindIndex(ColumnWriterIndex index);
/**
+ * Bind a listener to the underlying vector writer. This listener reports on vector
+ * events (overflow, growth), and so is called only when the writer is backed by
+ * a vector. The listener is ignored (and never called) for dummy (non-projected)
+ * columns. If the column is compound (such as for a nullable or repeated column,
+ * or for a map), then the writer is bound to the individual components.
+ *
+ * @param listener
+ * the vector event listener to bind
+ */
+
+ void bindListener(ColumnWriterListener listener);
+
+ /**
* Start a write (batch) operation. Performs any vector initialization
* required at the start of a batch (especially for offset vectors.)
*/
@@ -143,4 +193,6 @@ public interface WriterEvents {
*/
void postRollover();
+
+ abstract void dump(HierarchicalFormatter format);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index 9cfe56e22..919f68956 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
import org.apache.drill.exec.vector.accessor.ValueType;
-import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter;
+import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl;
import org.joda.time.Period;
/**
@@ -32,7 +32,7 @@ import org.joda.time.Period;
* nor is it backed by a real vector, index or type.
*/
-public class DummyScalarWriter extends AbstractScalarWriter {
+public class DummyScalarWriter extends AbstractScalarWriterImpl {
public DummyScalarWriter(ColumnMetadata schema) {
this.schema = schema;