diff options
Diffstat (limited to 'exec/vector/src/main/java/org/apache/drill')
22 files changed, 386 insertions, 403 deletions
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java index 02efd6def..691dcf168 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnConversionFactory.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.vector.accessor; import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.vector.accessor.writer.ConcreteWriter; /** * Create a column type converter for the given column and base writer. @@ -36,5 +35,5 @@ public interface ColumnConversionFactory { * @return a new scalar writer to insert between the client and * the base vector */ - ConcreteWriter newWriter(ColumnMetadata colDefn, ConcreteWriter baseWriter); + ScalarWriter newWriter(ColumnMetadata colDefn, ScalarWriter baseWriter); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java index 5c2fde1b6..5117782dd 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriter.java @@ -18,9 +18,6 @@ package org.apache.drill.exec.vector.accessor; import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.vector.accessor.ScalarWriter.ColumnWriterListener; -import org.apache.drill.exec.vector.accessor.TupleWriter.TupleWriterListener; -import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener; /** * Generic information about a column writer including: @@ -33,36 +30,7 @@ import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener * testing.</li> */ -public interface ColumnWriter extends WriterPosition { - - interface TupleListenable { - - /** - * Bind a listener to the underlying map or map array column. Not valid if the - * underlying writer is a scalar or scalar array. - * - * @param listener - * the tuple listener to bind - */ - - void bindListener(TupleWriterListener listener); - } - - interface ScalarListenable { - /** - * Bind a listener to the underlying scalar column, or array of scalar - * columns. Not valid if the underlying writer is a map or array of maps. - * - * @param listener - * the column listener to bind - */ - - void bindListener(ColumnWriterListener listener); - } - - interface VariantListenable { - void bindListener(VariantWriterListener listener); - } +public interface ColumnWriter { /** * Return the object (structure) type of this writer. diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java index af7df6e88..fffd4e588 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ObjectWriter.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.vector.accessor; +import org.apache.drill.exec.vector.accessor.writer.WriterEvents; + /** * Represents a column within a tuple. A column can be an array, a scalar or a * tuple. Each has an associated column metadata (schema) and a writer. The @@ -49,4 +51,11 @@ public interface ObjectWriter extends ColumnWriter { ArrayWriter array(); VariantWriter variant(); + + /** + * The internal state behind this writer. To be used only by the + * implementation, not by the client. + */ + + WriterEvents events(); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java index 87c798890..db86570d1 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java @@ -21,8 +21,6 @@ import java.math.BigDecimal; import org.joda.time.Period; -import org.apache.drill.exec.vector.accessor.ColumnWriter.ScalarListenable; - /** * Represents a scalar value: a required column, a nullable column, * or one element within an array of scalars. @@ -44,41 +42,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriter.ScalarListenable; * {@see ScalarElementReader} */ -public interface ScalarWriter extends ColumnWriter, ScalarListenable { - - /** - * Listener (callback) for vector overflow events. To be optionally - * implemented and bound by the client code of the writer. If no - * listener is bound, and a vector overflows, then an exception is - * thrown. - */ - - public interface ColumnWriterListener { - - /** - * Alert the listener that a vector has overflowed. Upon return, - * all writers must have a new set of buffers available, ready - * to accept the in-flight value that triggered the overflow. - * - * @param writer the writer that triggered the overflow - */ - - void overflowed(ScalarWriter writer); - - /** - * A writer wants to expand its vector. Allows the listener to - * either allow the growth, or trigger and overflow to limit - * batch size. - * - * @param writer the writer that wishes to grow its vector - * @param delta the amount by which the vector is to grow - * @return true if the vector can be grown, false if the writer - * should instead trigger an overflow by calling - * <tt>overflowed()</tt> - */ - - boolean canExpand(ScalarWriter writer, int delta); - } +public interface ScalarWriter extends ColumnWriter { /** * Describe the type of the value. This is a compression of the diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java index 6f5d17931..05ff2fc5a 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/TupleWriter.java @@ -21,7 +21,6 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.ProjectionType; import org.apache.drill.exec.record.metadata.TupleMetadata; -import org.apache.drill.exec.vector.accessor.ColumnWriter.TupleListenable; /** * Writer for a tuple. A tuple is composed of columns with a fixed order and @@ -52,23 +51,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriter.TupleListenable; * @see {@link SingleMapWriter}, the class which this class replaces */ -public interface TupleWriter extends ColumnWriter, TupleListenable { - - /** - * Listener (callback) to handle requests to add a new column to a tuple (row - * or map). Implemented and bound by the client code that creates or uses the - * tuple writer. If no listener is bound, then an attempt to add a column - * throws an exception. - */ - - interface TupleWriterListener { - - ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column); - - ObjectWriter addColumn(TupleWriter tuple, MaterializedField field); - - ProjectionType projectionType(String columnName); - } +public interface TupleWriter extends ColumnWriter { /** * Unchecked exception thrown when attempting to access a column writer by diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java index d7b97e110..585696f52 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.vector.accessor.writer; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ColumnWriter; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ObjectWriter; @@ -95,7 +96,7 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { public static class ArrayObjectWriter extends AbstractObjectWriter { - private AbstractArrayWriter arrayWriter; + private final AbstractArrayWriter arrayWriter; public ArrayObjectWriter(AbstractArrayWriter arrayWriter) { this.arrayWriter = arrayWriter; @@ -105,6 +106,9 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { public ArrayWriter array() { return arrayWriter; } @Override + public ColumnWriter writer() { return arrayWriter; } + + @Override public WriterEvents events() { return arrayWriter; } @Override @@ -266,6 +270,12 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { } @Override + public void bindListener(ColumnWriterListener listener) { + elementObjWriter.events().bindListener(listener); + offsetsWriter.bindListener(listener); + } + + @Override public ObjectType type() { return ObjectType.ARRAY; } @Override @@ -342,6 +352,7 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents { public OffsetVectorWriter offsetWriter() { return offsetsWriter; } + @Override public void dump(HierarchicalFormatter format) { format .startObject(this) diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java index 87a181b26..527a69cdc 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractObjectWriter.java @@ -38,9 +38,6 @@ import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; public abstract class AbstractObjectWriter implements ObjectWriter { @Override - public ColumnMetadata schema() { return baseWriter().schema(); } - - @Override public ScalarWriter scalar() { throw new UnsupportedOperationException(); } @@ -60,42 +57,24 @@ public abstract class AbstractObjectWriter implements ObjectWriter { throw new UnsupportedOperationException(); } - public abstract WriterEvents events(); - - public ColumnWriter baseWriter() { - return (ColumnWriter) events(); - } - + public abstract ColumnWriter writer(); @Override - public ObjectType type() { return baseWriter().type(); } + public abstract WriterEvents events(); @Override - public boolean nullable() { return baseWriter().nullable(); } + public ColumnMetadata schema() { return writer().schema(); } @Override - public void setNull() { - baseWriter().setNull(); - } + public ObjectType type() { return writer().type(); } @Override - public void setObject(Object value) { - baseWriter().setObject(value); - } - - public abstract void dump(HierarchicalFormatter format); + public boolean nullable() { return writer().nullable(); } @Override - public int rowStartIndex() { - return baseWriter().rowStartIndex(); - } + public void setNull() { writer().setNull(); } @Override - public int lastWriteIndex() { - return baseWriter().lastWriteIndex(); - } + public void setObject(Object value) { writer().setObject(value); } - @Override - public int writeIndex() { - return baseWriter().writeIndex(); - } + public abstract void dump(HierarchicalFormatter format); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java index 9c2e9861d..a596e14af 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriter.java @@ -17,109 +17,53 @@ */ package org.apache.drill.exec.vector.accessor.writer; -import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.accessor.ColumnConversionFactory; -import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; -import org.apache.drill.exec.vector.accessor.ObjectType; +import java.math.BigDecimal; + import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; +import org.apache.drill.exec.vector.accessor.writer.WriterEvents.ColumnWriterListener; +import org.joda.time.Period; /** - * Column writer implementation that acts as the basis for the - * generated, vector-specific implementations. All set methods - * throw an exception; subclasses simply override the supported - * method(s). + * Base class for concrete scalar column writers including actual vector + * writers, wrappers for nullable types, and shims used to convert types. */ -public abstract class AbstractScalarWriter extends ConcreteWriter { - - public static class ScalarObjectWriter extends AbstractObjectWriter { - - private ConcreteWriter scalarWriter; - - public ScalarObjectWriter(ConcreteWriter scalarWriter) { - final ColumnMetadata metadata = scalarWriter.schema(); - final ColumnConversionFactory factory = metadata.typeConverter(); - if (factory == null) { - this.scalarWriter = scalarWriter; - } else { - this.scalarWriter = factory.newWriter(metadata, scalarWriter); - } - } - - @Override - public ScalarWriter scalar() { return scalarWriter; } - - @Override - public WriterEvents events() { return scalarWriter; } - - @Override - public void dump(HierarchicalFormatter format) { - format - .startObject(this) - .attribute("scalarWriter"); - scalarWriter.dump(format); - format.endObject(); - } - } - - protected ColumnMetadata schema; - - /** - * Indicates the position in the vector to write. Set via an object so that - * all writers (within the same subtree) can agree on the write position. - * For example, all top-level, simple columns see the same row index. - * All columns within a repeated map see the same (inner) index, etc. - */ - - protected ColumnWriterIndex vectorIndex; +public abstract class AbstractScalarWriter implements ScalarWriter { @Override - public ObjectType type() { return ObjectType.SCALAR; } - - public void bindSchema(ColumnMetadata schema) { - this.schema = schema; - } - - @Override - public void bindIndex(ColumnWriterIndex vectorIndex) { - this.vectorIndex = vectorIndex; - } - - @Override - public int rowStartIndex() { - return vectorIndex.rowStartIndex(); + public void setObject(Object value) { + if (value == null) { + setNull(); + } else if (value instanceof Integer) { + setInt((Integer) value); + } else if (value instanceof Long) { + setLong((Long) value); + } else if (value instanceof String) { + setString((String) value); + } else if (value instanceof BigDecimal) { + setDecimal((BigDecimal) value); + } else if (value instanceof Period) { + setPeriod((Period) value); + } else if (value instanceof byte[]) { + final byte[] bytes = (byte[]) value; + setBytes(bytes, bytes.length); + } else if (value instanceof Byte) { + setInt((Byte) value); + } else if (value instanceof Short) { + setInt((Short) value); + } else if (value instanceof Double) { + setDouble((Double) value); + } else if (value instanceof Float) { + setDouble((Float) value); + } else { + throw conversionError(value.getClass().getSimpleName()); + } } - @Override - public int writeIndex() { - return vectorIndex.vectorIndex(); + protected UnsupportedConversionError conversionError(String javaType) { + return UnsupportedConversionError.writeError(schema(), javaType); } - @Override - public ColumnMetadata schema() { return schema; } - - public abstract BaseDataValueVector vector(); - - @Override - public void startWrite() { } - - @Override - public void startRow() { } - - @Override - public void endArrayValue() { } - - @Override - public void saveRow() { } - - @Override - public void dump(HierarchicalFormatter format) { - format - .startObject(this) - .attributeIdentity("vector", vector()) - .attribute("schema", vector().getField()) - .endObject(); - } + public void bindListener(ColumnWriterListener listener) { } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java new file mode 100644 index 000000000..c1306b618 --- /dev/null +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractScalarWriterImpl.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor.writer; + +import org.apache.drill.exec.record.metadata.ColumnMetadata; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.accessor.ColumnConversionFactory; +import org.apache.drill.exec.vector.accessor.ColumnWriter; +import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; +import org.apache.drill.exec.vector.accessor.ObjectType; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; + +/** + * Column writer implementation that acts as the basis for the + * generated, vector-specific implementations. All set methods + * throw an exception; subclasses simply override the supported + * method(s). + */ + +public abstract class AbstractScalarWriterImpl extends AbstractScalarWriter implements WriterEvents { + + /** + * Wraps a scalar writer and its event handler to provide a uniform + * JSON-like interface for all writer types. + * <p> + * The client sees only the scalar writer API. But, internals need + * visibility into a rather complex set of events to orchestrate + * vector events: mostly sent to the writer, but some times sent + * from the writer, such as vector overflow. Separating the two + * concepts makes it easier to add type-conversion shims on top of + * the actual vector writer. + */ + public static class ScalarObjectWriter extends AbstractObjectWriter { + + private final WriterEvents writerEvents; + private ScalarWriter scalarWriter; + + public ScalarObjectWriter(AbstractScalarWriterImpl scalarWriter) { + final ColumnMetadata metadata = scalarWriter.schema(); + final ColumnConversionFactory factory = metadata.typeConverter(); + writerEvents = scalarWriter; + if (factory == null) { + this.scalarWriter = scalarWriter; + } else { + this.scalarWriter = factory.newWriter(metadata, scalarWriter); + } + } + + @Override + public ScalarWriter scalar() { return scalarWriter; } + + @Override + public ColumnWriter writer() { return scalarWriter; } + + @Override + public WriterEvents events() { return writerEvents; } + + @Override + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attribute("scalarWriter"); + writerEvents.dump(format); + format.endObject(); + } + } + + protected ColumnMetadata schema; + + /** + * Indicates the position in the vector to write. Set via an object so that + * all writers (within the same subtree) can agree on the write position. + * For example, all top-level, simple columns see the same row index. + * All columns within a repeated map see the same (inner) index, etc. + */ + + protected ColumnWriterIndex vectorIndex; + + @Override + public ObjectType type() { return ObjectType.SCALAR; } + + public void bindSchema(ColumnMetadata schema) { + this.schema = schema; + } + + @Override + public void bindIndex(ColumnWriterIndex vectorIndex) { + this.vectorIndex = vectorIndex; + } + + @Override + public int rowStartIndex() { + return vectorIndex.rowStartIndex(); + } + + @Override + public int writeIndex() { + return vectorIndex.vectorIndex(); + } + + @Override + public ColumnMetadata schema() { return schema; } + + public abstract BaseDataValueVector vector(); + + @Override + public void startWrite() { } + + @Override + public void startRow() { } + + @Override + public void endArrayValue() { } + + @Override + public void saveRow() { } + + @Override + public void dump(HierarchicalFormatter format) { + format + .startObject(this) + .attributeIdentity("vector", vector()) + .attribute("schema", vector().getField()) + .endObject(); + } +} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java index aee806df8..006608125 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractTupleWriter.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.ProjectionType; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ColumnWriter; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ObjectWriter; @@ -104,13 +105,16 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { public static class TupleObjectWriter extends AbstractObjectWriter { - private AbstractTupleWriter tupleWriter; + private final AbstractTupleWriter tupleWriter; public TupleObjectWriter(AbstractTupleWriter tupleWriter) { this.tupleWriter = tupleWriter; } @Override + public ColumnWriter writer() { return tupleWriter; } + + @Override public TupleWriter tuple() { return tupleWriter; } @Override @@ -126,11 +130,27 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { } } + /** + * Listener (callback) to handle requests to add a new column to a tuple (row + * or map). Implemented and bound by the client code that creates or uses the + * tuple writer. If no listener is bound, then an attempt to add a column + * throws an exception. + */ + + public static interface TupleWriterListener { + + ObjectWriter addColumn(TupleWriter tuple, ColumnMetadata column); + + ObjectWriter addColumn(TupleWriter tuple, MaterializedField field); + + ProjectionType projectionType(String columnName); + } + protected final TupleMetadata tupleSchema; protected final List<AbstractObjectWriter> writers; protected ColumnWriterIndex vectorIndex; protected ColumnWriterIndex childIndex; - protected TupleWriterListener listener; + protected AbstractTupleWriter.TupleWriterListener listener; protected State state = State.IDLE; protected AbstractTupleWriter(TupleMetadata schema, List<AbstractObjectWriter> writers) { @@ -174,7 +194,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { public int addColumnWriter(AbstractObjectWriter colWriter) { assert writers.size() == tupleSchema.size(); - int colIndex = tupleSchema.addColumn(colWriter.schema()); + final int colIndex = tupleSchema.addColumn(colWriter.schema()); writers.add(colWriter); colWriter.events().bindIndex(childIndex); if (state != State.IDLE) { @@ -197,7 +217,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { if (listener == null) { throw new UnsupportedOperationException("addColumn"); } - AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column); + final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, column); return addColumnWriter(colWriter); } @@ -206,7 +226,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { if (listener == null) { throw new UnsupportedOperationException("addColumn"); } - AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field); + final AbstractObjectWriter colWriter = (AbstractObjectWriter) listener.addColumn(this, field); return addColumnWriter(colWriter); } @@ -317,7 +337,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { @Override public ObjectWriter column(String colName) { - int index = tupleSchema.index(colName); + final int index = tupleSchema.index(colName); if (index == -1) { throw new UndefinedColumnException(colName); } @@ -331,7 +351,7 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { @Override public void setObject(Object value) { - Object values[] = (Object[]) value; + final Object values[] = (Object[]) value; if (values.length != tupleSchema.size()) { throw new IllegalArgumentException( String.format("Map %s has %d columns, but value array has " + @@ -404,11 +424,14 @@ public abstract class AbstractTupleWriter implements TupleWriter, WriterEvents { return vectorIndex.vectorIndex(); } - @Override public void bindListener(TupleWriterListener listener) { this.listener = listener; } + @Override + public void bindListener(ColumnWriterListener listener) { } + + @Override public void dump(HierarchicalFormatter format) { format .startObject(this) diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConvertor.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java index 0d0bc8803..55cd991a7 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConvertor.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractWriteConverter.java @@ -20,11 +20,9 @@ package org.apache.drill.exec.vector.accessor.writer; import java.math.BigDecimal; import org.apache.drill.exec.record.metadata.ColumnMetadata; -import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.ValueType; -import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.joda.time.Period; /** @@ -38,20 +36,12 @@ import org.joda.time.Period; * for an int column in the case above. */ -// TODO: This organization works fine, but is a bit heavy-weight. -// It may be time to think about separating the pure writer aspect of -// a column writer from its plumbing aspects. That is, the base -// ConcreteWriter class combines the public API (ScalarWriter) with -// the internal implementation (WriterEvents) into a single class. -// Might be worth using composition rather than inheritance to keep -// these aspects distinct. +public class AbstractWriteConverter extends AbstractScalarWriter { -public class AbstractWriteConvertor extends ConcreteWriter { + private final ScalarWriter baseWriter; - private final ConcreteWriter baseWriter; - - public AbstractWriteConvertor(ScalarWriter baseWriter) { - this.baseWriter = (ConcreteWriter) baseWriter; + public AbstractWriteConverter(ScalarWriter baseWriter) { + this.baseWriter = baseWriter; } @Override @@ -60,31 +50,6 @@ public class AbstractWriteConvertor extends ConcreteWriter { } @Override - public int lastWriteIndex() { - return baseWriter.lastWriteIndex(); - } - - @Override - public void restartRow() { - baseWriter.restartRow(); - } - - @Override - public void endWrite() { - baseWriter.endWrite(); - } - - @Override - public void preRollover() { - baseWriter.preRollover(); - } - - @Override - public void postRollover() { - baseWriter.postRollover(); - } - - @Override public ObjectType type() { return baseWriter.type(); } @@ -105,46 +70,6 @@ public class AbstractWriteConvertor extends ConcreteWriter { } @Override - public int rowStartIndex() { - return baseWriter.rowStartIndex(); - } - - @Override - public int writeIndex() { - return baseWriter.writeIndex(); - } - - @Override - public void bindListener(ColumnWriterListener listener) { - baseWriter.bindListener(listener); - } - - @Override - public void bindIndex(ColumnWriterIndex index) { - baseWriter.bindIndex(index); - } - - @Override - public void startWrite() { - baseWriter.startWrite(); - } - - @Override - public void startRow() { - baseWriter.startRow(); - } - - @Override - public void endArrayValue() { - baseWriter.endArrayValue(); - } - - @Override - public void saveRow() { - baseWriter.saveRow(); - } - - @Override public void setInt(int value) { baseWriter.setInt(value); } @@ -178,9 +103,4 @@ public class AbstractWriteConvertor extends ConcreteWriter { public void setPeriod(Period value) { baseWriter.setPeriod(value); } - - @Override - public void dump(HierarchicalFormatter format) { - baseWriter.dump(format); - } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java index 7e121495f..e2e63d2ee 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java @@ -129,7 +129,7 @@ import io.netty.buffer.DrillBuf; * a roll-over when a vector overflows. */ -public abstract class BaseScalarWriter extends AbstractScalarWriter { +public abstract class BaseScalarWriter extends AbstractScalarWriterImpl { public static final int MIN_BUFFER_SIZE = 256; diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java index 6ee5bbcf7..206a0a66a 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ColumnWriterFactory.java @@ -26,7 +26,7 @@ import org.apache.drill.exec.vector.NullableVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.ColumnAccessorUtils; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter; -import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl.ScalarObjectWriter; import org.apache.drill.exec.vector.accessor.writer.dummy.DummyArrayWriter; import org.apache.drill.exec.vector.accessor.writer.dummy.DummyScalarWriter; import org.apache.drill.exec.vector.complex.RepeatedValueVector; diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java deleted file mode 100644 index 549431f76..000000000 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ConcreteWriter.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.vector.accessor.writer; - -import java.math.BigDecimal; - -import org.apache.drill.exec.vector.accessor.ScalarWriter; -import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; -import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; -import org.joda.time.Period; - -/** - * Base class for concrete scalar column writers including actual vector - * writers, wrappers for nullable types, and shims used to convert types. - */ - -public abstract class ConcreteWriter implements ScalarWriter, WriterEvents { - - @Override - public void setObject(Object value) { - if (value == null) { - setNull(); - } else if (value instanceof Integer) { - setInt((Integer) value); - } else if (value instanceof Long) { - setLong((Long) value); - } else if (value instanceof String) { - setString((String) value); - } else if (value instanceof BigDecimal) { - setDecimal((BigDecimal) value); - } else if (value instanceof Period) { - setPeriod((Period) value); - } else if (value instanceof byte[]) { - final byte[] bytes = (byte[]) value; - setBytes(bytes, bytes.length); - } else if (value instanceof Byte) { - setInt((Byte) value); - } else if (value instanceof Short) { - setInt((Short) value); - } else if (value instanceof Double) { - setDouble((Double) value); - } else if (value instanceof Float) { - setDouble((Float) value); - } else { - throw conversionError(value.getClass().getSimpleName()); - } - } - - protected UnsupportedConversionError conversionError(String javaType) { - return UnsupportedConversionError.writeError(schema(), javaType); - } - - abstract void dump(HierarchicalFormatter format); -} diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java index 1167d3385..d8c8947ca 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/EmptyListShim.java @@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectWriter; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; @@ -45,6 +46,9 @@ public class EmptyListShim implements UnionShim { public void bindIndex(ColumnWriterIndex index) { } @Override + public void bindListener(ColumnWriterListener listener) { } + + @Override public void startWrite() { } @Override @@ -127,6 +131,9 @@ public class EmptyListShim implements UnionShim { public int rowStartIndex() { return 0; } @Override + public int writeIndex() { return 0; } + + @Override public void addMember(AbstractObjectWriter colWriter) { // This shim has no types. If this is called, then the shim replacement @@ -135,4 +142,8 @@ public class EmptyListShim implements UnionShim { throw new UnsupportedOperationException(); } + @Override + public void dump(HierarchicalFormatter format) { + format.startObject(this).endObject(); + } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java index 75ef57fc4..e9fe11ae4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java @@ -23,12 +23,12 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.NullableVector; import org.apache.drill.exec.vector.accessor.ColumnAccessors.UInt1ColumnWriter; -import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ValueType; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.joda.time.Period; -public class NullableScalarWriter extends AbstractScalarWriter { +public class NullableScalarWriter extends AbstractScalarWriterImpl { public static final class ChildIndex implements ColumnWriterIndex { @@ -93,7 +93,7 @@ public class NullableScalarWriter extends AbstractScalarWriter { @Override public void bindIndex(ColumnWriterIndex index) { writerIndex = index; - ColumnWriterIndex childIndex = new ChildIndex(index); + final ColumnWriterIndex childIndex = new ChildIndex(index); isSetWriter.bindIndex(childIndex); baseWriter.bindIndex(childIndex); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java index f271bfab0..c761a95de 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java @@ -22,8 +22,9 @@ import java.math.BigDecimal; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; +import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.BaseArrayWriter; -import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter.ScalarObjectWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl.ScalarObjectWriter; import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.joda.time.Period; @@ -60,7 +61,7 @@ public class ScalarArrayWriter extends BaseArrayWriter { public final void nextElement() { next(); } } - private final ConcreteWriter elementWriter; + private final ScalarWriter elementWriter; public ScalarArrayWriter(ColumnMetadata schema, RepeatedValueVector vector, BaseScalarWriter elementWriter) { @@ -70,7 +71,7 @@ public class ScalarArrayWriter extends BaseArrayWriter { // Save the writer from the scalar object writer created above // which may have wrapped the element writer in a type convertor. - this.elementWriter = (ConcreteWriter) elementObjWriter.scalar(); + this.elementWriter = elementObjWriter.scalar(); } public static ArrayObjectWriter build(ColumnMetadata schema, @@ -83,7 +84,7 @@ public class ScalarArrayWriter extends BaseArrayWriter { public void bindIndex(ColumnWriterIndex index) { elementIndex = new ScalarElementWriterIndex(); super.bindIndex(index); - elementWriter.bindIndex(elementIndex); + elementObjWriter.events().bindIndex(elementIndex); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java index 6dd947503..66b81f976 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/SimpleListShim.java @@ -21,8 +21,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectWriter; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim; - import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; /** @@ -56,6 +56,11 @@ public class SimpleListShim implements UnionShim { } @Override + public void bindListener(ColumnWriterListener listener) { + colWriter.events().bindListener(listener); + } + + @Override public boolean hasType(MinorType type) { return type == colWriter.schema().type(); } @@ -185,11 +190,23 @@ public class SimpleListShim implements UnionShim { @Override public int lastWriteIndex() { - return colWriter.lastWriteIndex(); + return events().lastWriteIndex(); } @Override public int rowStartIndex() { - return colWriter.rowStartIndex(); + return events().rowStartIndex(); + } + + @Override + public int writeIndex() { + return events().writeIndex(); + } + + @Override + public void dump(HierarchicalFormatter format) { + format.startObject(this).attribute("colWriter"); + colWriter.dump(format); + format.endObject(); } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java index e39176f89..d18b382fb 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionVectorShim.java @@ -20,10 +20,10 @@ package org.apache.drill.exec.vector.accessor.writer; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.accessor.ColumnWriter; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectWriter; import org.apache.drill.exec.vector.accessor.VariantWriter.VariantWriterListener; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; import org.apache.drill.exec.vector.accessor.writer.AbstractFixedWidthWriter.BaseFixedWidthWriter; import org.apache.drill.exec.vector.accessor.writer.UnionWriterImpl.UnionShim; import org.apache.drill.exec.vector.complex.UnionVector; @@ -42,7 +42,7 @@ public class UnionVectorShim implements UnionShim { static class DefaultListener implements VariantWriterListener { - private UnionVectorShim shim; + private final UnionVectorShim shim; private DefaultListener(UnionVectorShim shim) { this.shim = shim; @@ -57,8 +57,8 @@ public class UnionVectorShim implements UnionShim { // will already be in the variant schema by the time we add the // writer to the variant writer in a few steps from now. - ValueVector memberVector = shim.vector.getMember(type); - ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type); + final ValueVector memberVector = shim.vector.getMember(type); + final ColumnMetadata memberSchema = shim.writer.variantSchema().addType(type); return ColumnWriterFactory.buildColumnWriter(memberSchema, memberVector); } @@ -100,7 +100,7 @@ public class UnionVectorShim implements UnionShim { @Override public void bindWriter(UnionWriterImpl writer) { this.writer = writer; - ColumnWriterIndex index = writer.index(); + final ColumnWriterIndex index = writer.index(); if (index != null) { bindIndex(index); } @@ -116,6 +116,12 @@ public class UnionVectorShim implements UnionShim { } } + // Unions are complex: the listener should bind to the individual components + // as they are created. + + @Override + public void bindListener(ColumnWriterListener listener) { } + @Override public void setNull() { @@ -133,7 +139,7 @@ public class UnionVectorShim implements UnionShim { @Override public ObjectWriter member(MinorType type) { - AbstractObjectWriter colWriter = variants[type.ordinal()]; + final AbstractObjectWriter colWriter = variants[type.ordinal()]; if (colWriter != null) { return colWriter; } @@ -150,14 +156,14 @@ public class UnionVectorShim implements UnionShim { @Override public AbstractObjectWriter addMember(ColumnMetadata schema) { - AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addMember(schema); + final AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addMember(schema); addMember(colWriter); return colWriter; } @Override public AbstractObjectWriter addMember(MinorType type) { - AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addType(type); + final AbstractObjectWriter colWriter = (AbstractObjectWriter) writer.listener().addType(type); addMember(colWriter); return colWriter; } @@ -189,7 +195,7 @@ public class UnionVectorShim implements UnionShim { */ public void addMemberWriter(AbstractObjectWriter colWriter) { - MinorType type = colWriter.schema().type(); + final MinorType type = colWriter.schema().type(); assert variants[type.ordinal()] == null; variants[type.ordinal()] = colWriter; } @@ -283,7 +289,10 @@ public class UnionVectorShim implements UnionShim { * @return the writer for the types vector */ - public ColumnWriter typeWriter() { return typeWriter; } + public AbstractScalarWriterImpl typeWriter() { return typeWriter; } + + @Override + public int writeIndex() { return typeWriter.writeIndex(); } @Override public int lastWriteIndex() { return typeWriter.lastWriteIndex(); } @@ -301,4 +310,11 @@ public class UnionVectorShim implements UnionShim { public void initTypeIndex(int typeFillCount) { ((BaseFixedWidthWriter) typeWriter).setLastWriteIndex(typeFillCount); } + + @Override + public void dump(HierarchicalFormatter format) { + format.startObject(this).attribute("typeWriter"); + typeWriter.dump(format); + format.endObject(); + } }
\ No newline at end of file diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java index 928317f09..a308ceaf4 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/UnionWriterImpl.java @@ -23,6 +23,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.record.metadata.VariantMetadata; import org.apache.drill.exec.vector.accessor.ArrayWriter; +import org.apache.drill.exec.vector.accessor.ColumnWriter; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ObjectType; import org.apache.drill.exec.vector.accessor.ObjectWriter; @@ -55,7 +56,9 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { ObjectWriter member(MinorType type); void setType(MinorType type); + @Override int lastWriteIndex(); + @Override int rowStartIndex(); AbstractObjectWriter addMember(ColumnMetadata colSchema); AbstractObjectWriter addMember(MinorType type); @@ -71,6 +74,9 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { } @Override + public ColumnWriter writer() { return writer; } + + @Override public VariantWriter variant() { return writer; } @Override @@ -136,6 +142,12 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { this.listener = listener; } + // Unions are complex: listeners should bind to the components as they + // are created. + + @Override + public void bindListener(ColumnWriterListener listener) { } + // The following are for coordinating with the shim. public State state() { return state; } @@ -192,7 +204,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { // conversion. Then set the type because, if the conversion is // done, the type vector exists only after creating the member. - ObjectWriter writer = shim.member(type); + final ObjectWriter writer = shim.member(type); setType(type); return writer; } @@ -222,7 +234,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { */ protected void addMember(AbstractObjectWriter writer) { - MinorType type = writer.schema().type(); + final MinorType type = writer.schema().type(); // If the metadata has not yet been added to the variant // schema, do so now. (Unfortunately, the default listener @@ -334,7 +346,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { // Can look for exactly one period type as is done for Object[] below throw new IllegalArgumentException("Period is ambiguous, please use scalar(type)"); } else if (value instanceof byte[]) { - byte[] bytes = (byte[]) value; + final byte[] bytes = (byte[]) value; scalar(MinorType.VARBINARY).setBytes(bytes, bytes.length); } else if (value instanceof Byte) { scalar(MinorType.TINYINT).setInt((Byte) value); @@ -361,6 +373,7 @@ public class UnionWriterImpl implements VariantWriter, WriterEvents { } } + @Override public void dump(HierarchicalFormatter format) { // TODO Auto-generated method stub diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java index c8f9f4896..1983d1799 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.vector.accessor.writer; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; +import org.apache.drill.exec.vector.accessor.ScalarWriter; +import org.apache.drill.exec.vector.accessor.WriterPosition; +import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; /** * Internal interface used to control the behavior @@ -37,7 +40,7 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; * calls, can change. */ -public interface WriterEvents { +public interface WriterEvents extends WriterPosition { /** * Tracks the write state of a tuple or variant to allow applying the correct @@ -72,6 +75,40 @@ public interface WriterEvents { } /** + * Listener (callback) for vector overflow events. To be optionally + * implemented and bound by the client code of the writer. If no + * listener is bound, and a vector overflows, then an exception is + * thrown. + */ + + interface ColumnWriterListener { + + /** + * Alert the listener that a vector has overflowed. Upon return, + * all writers must have a new set of buffers available, ready + * to accept the in-flight value that triggered the overflow. + * + * @param writer the writer that triggered the overflow + */ + + void overflowed(ScalarWriter writer); + + /** + * A writer wants to expand its vector. Allows the listener to + * either allow the growth, or trigger and overflow to limit + * batch size. + * + * @param writer the writer that wishes to grow its vector + * @param delta the amount by which the vector is to grow + * @return true if the vector can be grown, false if the writer + * should instead trigger an overflow by calling + * <tt>overflowed()</tt> + */ + + boolean canExpand(ScalarWriter writer, int delta); + } + + /** * Bind the writer to a writer index. * * @param index the writer index (top level or nested for @@ -81,6 +118,19 @@ public interface WriterEvents { void bindIndex(ColumnWriterIndex index); /** + * Bind a listener to the underlying vector writer. This listener reports on vector + * events (overflow, growth), and so is called only when the writer is backed by + * a vector. The listener is ignored (and never called) for dummy (non-projected) + * columns. If the column is compound (such as for a nullable or repeated column, + * or for a map), then the writer is bound to the individual components. + * + * @param listener + * the vector event listener to bind + */ + + void bindListener(ColumnWriterListener listener); + + /** * Start a write (batch) operation. Performs any vector initialization * required at the start of a batch (especially for offset vectors.) */ @@ -143,4 +193,6 @@ public interface WriterEvents { */ void postRollover(); + + abstract void dump(HierarchicalFormatter format); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java index 9cfe56e22..919f68956 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java @@ -23,7 +23,7 @@ import org.apache.drill.exec.record.metadata.ColumnMetadata; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.accessor.ColumnWriterIndex; import org.apache.drill.exec.vector.accessor.ValueType; -import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriter; +import org.apache.drill.exec.vector.accessor.writer.AbstractScalarWriterImpl; import org.joda.time.Period; /** @@ -32,7 +32,7 @@ import org.joda.time.Period; * nor is it backed by a real vector, index or type. */ -public class DummyScalarWriter extends AbstractScalarWriter { +public class DummyScalarWriter extends AbstractScalarWriterImpl { public DummyScalarWriter(ColumnMetadata schema) { this.schema = schema; |