diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-07-31 23:46:26 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-08-06 16:44:23 -0700 |
commit | 49d238eea601a6764129b9d64cf9bca440209e12 (patch) | |
tree | 6b9c7d344bbf248e0f1c66ca5f37fbcc3f92e8d9 /exec/java-exec/src/main | |
parent | 450d891eb9cc9a77e537537e5d70ecb94de697b8 (diff) |
DRILL-1252: Implement Complex parquet and json writers
Diffstat (limited to 'exec/java-exec/src/main')
15 files changed, 763 insertions, 153 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java index 6a7fb8623..6b6065f6b 100644 --- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java @@ -23,18 +23,35 @@ package org.apache.drill.exec.store; import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import java.io.IOException; import java.lang.UnsupportedOperationException; public abstract class AbstractRecordWriter implements RecordWriter { + @Override + public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException("Doesn't support writing Map'"); + } + + @Override + public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException("Doesn't support writing RepeatedMap"); + } + + @Override + public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException("Doesn't support writing RepeatedList"); + } + <#list vv.types as type> <#list type.minor as minor> <#list vv.modes as mode> @Override - public void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException { - throw new UnsupportedOperationException("Doesn't support writing '${mode.prefix}${minor.class}Holder'"); + public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException("Doesn't support writing '${mode.prefix}${minor.class}'"); } </#list> </#list> diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java index b58f24cf9..e76178a59 100644 --- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java @@ -25,11 +25,9 @@ package org.apache.drill.exec.store; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.exec.expr.holders.*; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.RecordValueAccessor; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import java.io.IOException; import java.util.List; @@ -40,61 +38,44 @@ import java.util.Map; public class EventBasedRecordWriter { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EventBasedRecordWriter.class); - private BatchSchema schema; - private RecordValueAccessor rva; + private VectorAccessible batch; private RecordWriter recordWriter; - private List<FieldWriter> fieldWriters; + private List<FieldConverter> fieldConverters; - static private Map<MajorType, Class<? extends FieldWriter>> typeClassMap; - - static { - typeClassMap = Maps.newHashMap(); - -<#list vv.types as type> - <#list type.minor as minor> - <#list vv.modes as mode> - typeClassMap.put(${mode.prefix}${minor.class}Holder.TYPE, ${mode.prefix}${minor.class}FieldWriter.class); - </#list> - </#list> -</#list> - } - - public EventBasedRecordWriter(BatchSchema schema, RecordValueAccessor rva, RecordWriter recordWriter) - throws IOException { - this.schema = schema; - this.rva = rva; + public EventBasedRecordWriter(VectorAccessible batch, RecordWriter recordWriter) + throws IOException { + this.batch = batch; this.recordWriter = recordWriter; initFieldWriters(); } - public int write() throws IOException { + public int write(int recordCount) throws IOException { int counter = 0; - rva.resetIterator(); - while(rva.next()) { + for (; counter < recordCount; counter++) { recordWriter.startRecord(); // write the current record - int fieldId = 0; - for (MaterializedField field : schema) { - fieldWriters.get(fieldId).writeField(); - fieldId++; + for (FieldConverter converter : fieldConverters) { + converter.setPosition(counter); + converter.startField(); + converter.writeField(); + converter.endField(); } recordWriter.endRecord(); - counter++; } return counter; } private void initFieldWriters() throws IOException { - fieldWriters = Lists.newArrayList(); + fieldConverters = Lists.newArrayList(); try { - for (int i = 0; i < schema.getFieldCount(); i++) { - MajorType mt = schema.getColumn(i).getType(); - MajorType newMt = MajorType.newBuilder().setMinorType(mt.getMinorType()).setMode(mt.getMode()).build(); - fieldWriters.add(i, typeClassMap.get(newMt) - .getConstructor(EventBasedRecordWriter.class, int.class).newInstance(this, i)); + int fieldId = 0; + for (VectorWrapper w : batch) { + FieldReader reader = w.getValueVector().getAccessor().getReader(); + FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getLastName(), reader); + fieldConverters.add(converter); } } catch(Exception e) { logger.error("Failed to create FieldWriter.", e); @@ -102,33 +83,64 @@ public class EventBasedRecordWriter { } } - abstract class FieldWriter { + public static abstract class FieldConverter { protected int fieldId; + protected String fieldName; + protected FieldReader reader; - public FieldWriter(int fieldId) { + public FieldConverter(int fieldId, String fieldName, FieldReader reader) { this.fieldId = fieldId; + this.fieldName = fieldName; + this.reader = reader; } - public abstract void writeField() throws IOException; - } - -<#list vv.types as type> - <#list type.minor as minor> - <#list vv.modes as mode> - class ${mode.prefix}${minor.class}FieldWriter extends FieldWriter { - private ${mode.prefix}${minor.class}Holder holder = new ${mode.prefix}${minor.class}Holder(); + public void setPosition(int index) { + reader.setPosition(index); + } - public ${mode.prefix}${minor.class}FieldWriter(int fieldId) { - super(fieldId); + public void startField() throws IOException { + // no op } - public void writeField() throws IOException { - rva.getFieldById(fieldId, holder); - recordWriter.add${mode.prefix}${minor.class}Holder(fieldId, holder); + public void endField() throws IOException { + // no op } + + public abstract void writeField() throws IOException; } - </#list> - </#list> -</#list> + public static FieldConverter getConverter(RecordWriter recordWriter, int fieldId, String fieldName, FieldReader reader) { + switch (reader.getType().getMinorType()) { + case MAP: + switch (reader.getType().getMode()) { + case REQUIRED: + case OPTIONAL: + return recordWriter.getNewMapConverter(fieldId, fieldName, reader); + case REPEATED: + return recordWriter.getNewRepeatedMapConverter(fieldId, fieldName, reader); + } + + case LIST: + switch (reader.getType().getMode()) { + case REPEATED: + return recordWriter.getNewRepeatedListConverter(fieldId, fieldName, reader); + } + + <#list vv.types as type> + <#list type.minor as minor> + case ${minor.class?upper_case}: + switch (reader.getType().getMode()) { + case REQUIRED: + return recordWriter.getNew${minor.class}Converter(fieldId, fieldName, reader); + case OPTIONAL: + return recordWriter.getNewNullable${minor.class}Converter(fieldId, fieldName, reader); + case REPEATED: + return recordWriter.getNewRepeated${minor.class}Converter(fieldId, fieldName, reader); + } + </#list> + </#list> + + } + throw new UnsupportedOperationException(); + } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java new file mode 100644 index 000000000..d1a6d4e3b --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.joda.time.DateTimeUtils; +import parquet.io.api.Binary; + +import java.lang.Override; +import java.lang.RuntimeException; + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="org/apache/drill/exec/store/JSONOutputRecordWriter.java" /> +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.store; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.store.parquet.ParquetTypeHelper; +import org.apache.drill.exec.vector.*; +import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import parquet.io.api.RecordConsumer; +import parquet.schema.MessageType; +import parquet.io.api.Binary; +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; + + +import org.apache.drill.common.types.TypeProtos; + +import org.joda.time.DateTimeUtils; + +import java.io.IOException; +import java.lang.UnsupportedOperationException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * Abstract implementation of RecordWriter interface which exposes interface: + * {@link #writeHeader(List)} + * {@link #addField(int,String)} + * to output the data in string format instead of implementing addField for each type holder. + * + * This is useful for text format writers such as CSV, TSV etc. + */ +public abstract class JSONOutputRecordWriter extends AbstractRecordWriter implements RecordWriter { + + protected JsonGenerator gen; + +<#list vv.types as type> + <#list type.minor as minor> + <#list vv.modes as mode> + <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) /> + @Override + public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) { + return new ${mode.prefix}${minor.class}JsonConverter(fieldId, fieldName, reader); + } + + public class ${mode.prefix}${minor.class}JsonConverter extends FieldConverter { + + public ${mode.prefix}${minor.class}JsonConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + @Override + public void startField() throws IOException { + gen.writeFieldName(fieldName); + } + + @Override + public void writeField() throws IOException { + <#if mode.prefix == "Nullable" > + if (!reader.isSet()) { + gen.writeNull(); + return; + } + <#elseif mode.prefix == "Repeated" > + // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements + if (reader.size() == 0) { + return; + } + gen.writeStartArray(); + for (int i = 0; i < reader.size(); i++) { + <#else> + </#if> + + <#if minor.class == "TinyInt" || + minor.class == "UInt1" || + minor.class == "UInt2" || + minor.class == "SmallInt" || + minor.class == "Int" || + minor.class == "Decimal9" || + minor.class == "Float4" || + minor.class == "BigInt" || + minor.class == "Decimal18" || + minor.class == "UInt8" || + minor.class == "UInt4" || + minor.class == "Float8" || + minor.class == "Decimal28Sparse" || + minor.class == "Decimal28Dense" || + minor.class == "Decimal38Dense" || + minor.class == "Decimal38Sparse"> + <#if mode.prefix == "Repeated" > + gen.writeNumber(reader.read${friendlyType}(i)); + <#else> + gen.writeNumber(reader.read${friendlyType}()); + </#if> + <#elseif minor.class == "Date" || + minor.class == "Time" || + minor.class == "TimeStamp" || + minor.class == "TimeTZ" || + minor.class == "TimeStampTZ" || + minor.class == "IntervalDay" || + minor.class == "Interval" || + minor.class == "VarChar" || + minor.class == "Var16Char" || + minor.class == "IntervalYear"> + <#if mode.prefix == "Repeated" > + gen.writeString(reader.read${friendlyType}(i).toString()); + <#else> + gen.writeString(reader.read${friendlyType}().toString()); + </#if> + <#elseif + minor.class == "Bit"> + <#if mode.prefix == "Repeated" > + gen.writeBoolean(reader.read${friendlyType}(i)); + <#else> + gen.writeBoolean(reader.read${friendlyType}()); + </#if> + <#elseif + minor.class == "VarBinary"> + <#if mode.prefix == "Repeated" > + gen.writeBinary(reader.readByteArray(i)); + <#else> + gen.writeBinary(reader.readByteArray()); + </#if> + </#if> + <#if mode.prefix == "Repeated"> + } + gen.writeEndArray(); + </#if> + } + } + </#list> + </#list> +</#list> + +} diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index 5284199e1..aa25d1a88 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -33,9 +33,11 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.parquet.ParquetTypeHelper; import org.apache.drill.exec.vector.*; import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import parquet.io.api.RecordConsumer; import parquet.schema.MessageType; import parquet.io.api.Binary; @@ -63,7 +65,7 @@ import java.util.Map; * * This is useful for text format writers such as CSV, TSV etc. */ -public abstract class ParquetOutputRecordWriter implements RecordWriter { +public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter implements RecordWriter { private RecordConsumer consumer; private MessageType schema; @@ -78,17 +80,30 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { <#list type.minor as minor> <#list vv.modes as mode> @Override - public void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException { + public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) { + return new ${mode.prefix}${minor.class}ParquetConverter(fieldId, fieldName, reader); + } + + public class ${mode.prefix}${minor.class}ParquetConverter extends FieldConverter { + private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder(); + + public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + @Override + public void writeField() throws IOException { <#if mode.prefix == "Nullable" > - if (valueHolder.isSet == 0) { + if (!reader.isSet()) { return; } <#elseif mode.prefix == "Repeated" > // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements - if (valueHolder.start == valueHolder.end) + if (reader.size() == 0) { return; - consumer.startField(schema.getFieldName(fieldId), fieldId); - for (int i = valueHolder.start; i < valueHolder.end; i++) { + } + consumer.startField(fieldName, fieldId); + for (int i = 0; i < reader.size(); i++) { </#if> <#if minor.class == "TinyInt" || @@ -101,20 +116,24 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { minor.class == "Decimal9" || minor.class == "UInt4"> <#if mode.prefix == "Repeated" > - consumer.addInteger(valueHolder.vector.getAccessor().get(i)); + reader.read(i, holder); + consumer.addInteger(holder.value); <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addInteger(valueHolder.value); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.startField(fieldName, fieldId); + reader.read(holder); + consumer.addInteger(holder.value); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "Float4"> <#if mode.prefix == "Repeated" > - consumer.addFloat(valueHolder.vector.getAccessor().get(i)); + reader.read(i, holder); + consumer.addFloat(holder.value); <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addFloat(valueHolder.value); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.startField(fieldName, fieldId); + reader.read(holder); + consumer.addFloat(holder.value); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "BigInt" || @@ -122,59 +141,64 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { minor.class == "TimeStamp" || minor.class == "UInt8"> <#if mode.prefix == "Repeated" > - consumer.addLong(valueHolder.vector.getAccessor().get(i)); + reader.read(i, holder); + consumer.addLong(holder.value); <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addLong(valueHolder.value); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.startField(fieldName, fieldId); + reader.read(holder); + consumer.addLong(holder.value); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "Date"> <#if mode.prefix == "Repeated" > - consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.vector.getAccessor().get(i)) + JULIAN_DAY_EPOC)); + reader.read(i, holder); + consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC)); <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); + consumer.startField(fieldName, fieldId); + reader.read(holder); // convert from internal Drill date format to Julian Day centered around Unix Epoc - consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.value) + JULIAN_DAY_EPOC)); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC)); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "Float8"> <#if mode.prefix == "Repeated" > - consumer.addDouble(valueHolder.vector.getAccessor().get(i)); + reader.read(i, holder); + consumer.addDouble(holder.value); <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addDouble(valueHolder.value); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.startField(fieldName, fieldId); + reader.read(holder); + consumer.addDouble(holder.value); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "Bit"> <#if mode.prefix == "Repeated" > - consumer.addBoolean(valueHolder.vector.getAccessor().get(i) == 1); + reader.read(i, holder); + consumer.addBoolean(holder.value == 1); <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addBoolean(valueHolder.value == 1); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.startField(fieldName, fieldId); + consumer.addBoolean(holder.value == 1); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse"> <#if mode.prefix == "Repeated" > <#else> - consumer.startField(schema.getFieldName(fieldId), fieldId); - ${minor.class}Vector tempVec = new ${minor.class}Vector(MaterializedField.create("", TypeProtos.MajorType.getDefaultInstance()), new TopLevelAllocator()); - tempVec.allocateNew(10); - tempVec.getMutator().setSafe(0, valueHolder); + consumer.startField(fieldName, fieldId); + reader.read(holder); byte[] bytes = DecimalUtility.getBigDecimalFromSparse( - valueHolder.buffer, valueHolder.start, ${minor.class}Holder.nDecimalDigits, valueHolder.scale).unscaledValue().toByteArray(); + holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray(); byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})]; - if (valueHolder.getSign()) { + if (holder.getSign()) { Arrays.fill(output, 0, output.length - bytes.length, (byte)0xFF); } else { Arrays.fill(output, 0, output.length - bytes.length, (byte)0x0); } System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length); consumer.addBinary(Binary.fromByteArray(output)); - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.endField(fieldName, fieldId); </#if> <#elseif minor.class == "TimeTZ" || @@ -190,22 +214,23 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter { </#if> <#elseif minor.class == "VarChar" || minor.class == "Var16Char" || minor.class == "VarBinary"> <#if mode.prefix == "Repeated"> - ${minor.class}Holder singleHolder = new ${minor.class}Holder(); - valueHolder.vector.getAccessor().get(i, singleHolder); - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addBinary(Binary.fromByteBuffer(singleHolder.buffer.nioBuffer(singleHolder.start, singleHolder.end - singleHolder.start))); - consumer.endField(schema.getFieldName(fieldId), fieldId); + reader.read(i, holder); + consumer.startField(fieldName, fieldId); + consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); + consumer.endField(fieldName, fieldId); <#else> - ByteBuf buf = valueHolder.buffer; - consumer.startField(schema.getFieldName(fieldId), fieldId); - consumer.addBinary(Binary.fromByteBuffer(valueHolder.buffer.nioBuffer(valueHolder.start, valueHolder.end - valueHolder.start))); - consumer.endField(schema.getFieldName(fieldId), fieldId); + reader.read(holder); + ByteBuf buf = holder.buffer; + consumer.startField(fieldName, fieldId); + consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); + consumer.endField(fieldName, fieldId); </#if> </#if> <#if mode.prefix == "Repeated"> } - consumer.endField(schema.getFieldName(fieldId), fieldId); + consumer.endField(fieldName, fieldId); </#if> + } } </#list> </#list> diff --git a/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java b/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java index d4c68171a..47197315e 100644 --- a/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java +++ b/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java @@ -62,6 +62,12 @@ public class RecordValueAccessor { return ++currentIndex < batch.getRecordCount(); } + public void getFieldById(int fieldId, ComplexHolder holder) { + holder.isSet = vectors[fieldId].getAccessor().isNull(currentIndex) ? 1 : 0; + holder.reader = (vectors[fieldId]).getAccessor().getReader(); + holder.reader.setPosition(currentIndex); + } + <#list vv.types as type> <#list type.minor as minor> <#list vv.modes as mode> diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java index 2334a1464..c6325fd0a 100644 --- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java @@ -23,6 +23,8 @@ package org.apache.drill.exec.store; import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import java.io.IOException; import java.lang.UnsupportedOperationException; @@ -52,11 +54,16 @@ public interface RecordWriter { */ void startRecord() throws IOException; + /** Add the field value given in <code>valueHolder</code> at the given column number <code>fieldId</code>. */ + public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader); + public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader); + public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader); + <#list vv.types as type> <#list type.minor as minor> <#list vv.modes as mode> /** Add the field value given in <code>valueHolder</code> at the given column number <code>fieldId</code>. */ - void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException; + public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader); </#list> </#list> diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java index 9f0d70182..070ed7b55 100644 --- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java @@ -16,6 +16,9 @@ * limitations under the License. */ +import java.lang.Override; +import java.lang.UnsupportedOperationException; + <@pp.dropOutputFile /> <@pp.changeOutputFile name="org/apache/drill/exec/store/StringOutputRecordWriter.java" /> <#include "/@includes/license.ftl" /> @@ -28,7 +31,9 @@ import org.apache.drill.exec.expr.holders.*; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import java.io.IOException; import java.lang.UnsupportedOperationException; @@ -45,45 +50,65 @@ import java.util.Map; */ public abstract class StringOutputRecordWriter implements RecordWriter { - private ValueVector[] columnVectors; private final BufferAllocator allocator; protected StringOutputRecordWriter(BufferAllocator allocator){ this.allocator = allocator; } public void updateSchema(BatchSchema schema) throws IOException { - cleanupColumnVectors(); - columnVectors = new ValueVector[schema.getFieldCount()]; - List<String> columnNames = Lists.newArrayList(); - for (int i=0; i<columnVectors.length; i++) { - columnNames.add(schema.getColumn(i).getAsSchemaPath().getAsUnescapedPath()); + for (int i=0; i < schema.getFieldCount(); i++) { + columnNames.add(schema.getColumn(i).getLastName()); } startNewSchema(columnNames); + } - for (int i=0; i<columnVectors.length; i++) { - columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), allocator); - AllocationHelper.allocate(columnVectors[i], 1, TypeHelper.getSize(schema.getColumn(i).getType())); - } + @Override + public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException(); + } + public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException(); + } + public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException(); } <#list vv.types as type> <#list type.minor as minor> <#list vv.modes as mode> @Override - public void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException { + public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) { + return new ${mode.prefix}${minor.class}StringFieldConverter(fieldId, fieldName, reader); + } + + public class ${mode.prefix}${minor.class}StringFieldConverter extends FieldConverter { + <#if mode.prefix == "Repeated"> + private Repeated${minor.class}Holder holder = new Repeated${minor.class}Holder(); + <#else> + private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder(); + </#if> + + public ${mode.prefix}${minor.class}StringFieldConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + @Override + public void writeField() throws IOException { <#if mode.prefix == "Nullable" > - if (valueHolder.isSet == 0) { + if (!reader.isSet()) { addField(fieldId, null); return; } <#elseif mode.prefix == "Repeated" > throw new UnsupportedOperationException("Repeated types are not supported."); + } } <#break> </#if> + reader.read(holder); <#if minor.class == "TinyInt" || minor.class == "UInt1" || minor.class == "UInt2" || @@ -94,9 +119,9 @@ public abstract class StringOutputRecordWriter implements RecordWriter { minor.class == "BigInt" || minor.class == "UInt8" || minor.class == "Float8"> - addField(fieldId, String.valueOf(valueHolder.value)); + addField(fieldId, String.valueOf(holder.value)); <#elseif minor.class == "Bit"> - addField(fieldId, valueHolder.value == 0 ? "false" : "true"); + addField(fieldId, holder.value == 0 ? "false" : "true"); <#elseif minor.class == "Date" || minor.class == "Time" || @@ -114,33 +139,21 @@ public abstract class StringOutputRecordWriter implements RecordWriter { minor.class == "Decimal38Sparse"> // TODO: error check - ((${mode.prefix}${minor.class}Vector)columnVectors[fieldId]).getMutator().setSafe(0, valueHolder); - Object obj = ((${mode.prefix}${minor.class}Vector)columnVectors[fieldId]).getAccessor().getObject(0); - addField(fieldId, obj.toString()); + addField(fieldId, reader.readObject().toString()); <#elseif minor.class == "VarChar" || minor.class == "Var16Char" || minor.class == "VarBinary"> - addField(fieldId, valueHolder.toString()); + addField(fieldId, reader.readObject().toString()); <#else> throw new UnsupportedOperationException(String.format("Unsupported field type: %s"), - valueHolder.getCanonicalClass()); + holder.getCanonicalClass()); </#if> + } } </#list> </#list> </#list> public void cleanup() throws IOException { - cleanupColumnVectors(); - } - - private void cleanupColumnVectors() { - if (columnVectors != null){ - for(ValueVector vector : columnVectors){ - if(vector!=null){ - vector.clear(); - } - } - } } public abstract void startNewSchema(List<String> columnNames) throws IOException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 29b346ddd..ef4db2a78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -108,7 +108,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { // fall through. case OK: try { - counter += eventBasedRecordWriter.write(); + counter += eventBasedRecordWriter.write(incoming.getRecordCount()); logger.debug("Total records written so far: {}", counter); } catch(IOException ex) { throw new RuntimeException(ex); @@ -162,8 +162,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { stats.stopSetup(); } - eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(), - new RecordValueAccessor(incoming), recordWriter); + eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java index a2d22cfe4..540977dec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -50,6 +50,10 @@ public class MaterializedField { .setNamePart(key.path.getAsNamePart()); } + public List<MaterializedField> getChildren() { + return children; + } + public void addChild(MaterializedField field){ children.add(field); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 7fbb9c7a6..8b5577c06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -19,20 +19,24 @@ package org.apache.drill.exec.store.easy.json; import java.io.IOException; import java.util.List; +import java.util.Map; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; +import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; import org.apache.drill.exec.store.dfs.easy.EasyWriter; import org.apache.drill.exec.store.dfs.easy.FileWork; @@ -40,6 +44,8 @@ import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.exec.store.text.DrillTextRecordWriter; +import org.apache.hadoop.fs.FileSystem; public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { @@ -59,7 +65,23 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { @Override public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException { - throw new UnsupportedOperationException("Json Writer is not supported currently."); + Map<String, String> options = Maps.newHashMap(); + + options.put("location", writer.getLocation()); + + FragmentHandle handle = context.getHandle(); + String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); + options.put("prefix", fragmentId); + + options.put("separator", " "); + options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection); + + options.put("extension", "json"); + + RecordWriter recordWriter = new JsonRecordWriter(); + recordWriter.init(options); + + return recordWriter; } @JsonTypeName("json") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java new file mode 100644 index 000000000..da9f48b77 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json; + +import com.fasterxml.jackson.core.JsonFactory; +import com.google.common.collect.Lists; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.store.EventBasedRecordWriter; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; +import org.apache.drill.exec.store.JSONOutputRecordWriter; +import org.apache.drill.exec.store.RecordWriter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWriter { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class); + + private String location; + private String prefix; + + private String fieldDelimiter; + private String extension; + + private int index; + private FileSystem fs = null; + private FSDataOutputStream stream = null; + + private final JsonFactory factory = new JsonFactory(); + + // Record write status + private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called + + public JsonRecordWriter(){ + } + + @Override + public void init(Map<String, String> writerOptions) throws IOException { + this.location = writerOptions.get("location"); + this.prefix = writerOptions.get("prefix"); + this.fieldDelimiter = writerOptions.get("separator"); + this.extension = writerOptions.get("extension"); + + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY)); + this.fs = FileSystem.get(conf); + + Path fileName = new Path(location, prefix + "_" + index + "." + extension); + try { + stream = fs.create(fileName); + gen = factory.createGenerator(stream).useDefaultPrettyPrinter(); + logger.debug("Created file: {}", fileName); + } catch (IOException ex) { + logger.error("Unable to create file: " + fileName, ex); + throw ex; + } + } + + @Override + public void updateSchema(BatchSchema schema) throws IOException { + // no op + } + + @Override + public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) { + return new MapJsonConverter(fieldId, fieldName, reader); + } + + public class MapJsonConverter extends FieldConverter { + List<FieldConverter> converters = Lists.newArrayList(); + + public MapJsonConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + int i = 0; + for (String name : reader) { + FieldConverter converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, i++, name, reader.reader(name)); + converters.add(converter); + } + } + + @Override + public void startField() throws IOException { + gen.writeFieldName(fieldName); + } + + @Override + public void writeField() throws IOException { + gen.writeStartObject(); + for (FieldConverter converter : converters) { + converter.startField(); + converter.writeField(); + } + gen.writeEndObject(); + } + } + + @Override + public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { + return new RepeatedMapJsonConverter(fieldId, fieldName, reader); + } + + public class RepeatedMapJsonConverter extends FieldConverter { + List<FieldConverter> converters = Lists.newArrayList(); + + public RepeatedMapJsonConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + int i = 0; + for (String name : reader) { + FieldConverter converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, i++, name, reader.reader(name)); + converters.add(converter); + } + } + + @Override + public void startField() throws IOException { + gen.writeFieldName(fieldName); + } + + @Override + public void writeField() throws IOException { + gen.writeStartArray(); + while (reader.next()) { + gen.writeStartObject(); + for (FieldConverter converter : converters) { + converter.startField(); + converter.writeField(); + } + gen.writeEndObject(); + } + gen.writeEndArray(); + } + } + + @Override + public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) { + return new RepeatedListJsonConverter(fieldId, fieldName, reader); + } + + public class RepeatedListJsonConverter extends FieldConverter { + FieldConverter converter; + + public RepeatedListJsonConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, fieldId, fieldName, reader.reader()); + } + + @Override + public void startField() throws IOException { + gen.writeFieldName(fieldName); + } + + @Override + public void writeField() throws IOException { + gen.writeStartArray(); + while (reader.next()) { + converter.writeField(); + } + gen.writeEndArray(); + } + } + + @Override + public void startRecord() throws IOException { + gen.writeStartObject(); + fRecordStarted = true; + } + + @Override + public void endRecord() throws IOException { + gen.writeEndObject(); + fRecordStarted = false; + } + + @Override + public void abort() throws IOException { + } + + @Override + public void cleanup() throws IOException { + gen.flush(); + stream.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index a3363168b..94ccc133a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -18,11 +18,18 @@ package org.apache.drill.exec.store.parquet; import com.google.common.collect.Lists; +import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.EventBasedRecordWriter; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,8 +43,10 @@ import parquet.io.ColumnIOFactory; import parquet.io.MessageColumnIO; import parquet.io.api.RecordConsumer; import parquet.schema.DecimalMetadata; +import parquet.schema.GroupType; import parquet.schema.MessageType; import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType; import parquet.schema.PrimitiveType.PrimitiveTypeName; import parquet.schema.Type; import parquet.schema.Type.Repetition; @@ -107,15 +116,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private void newSchema() throws IOException { List<Type> types = Lists.newArrayList(); for (MaterializedField field : batchSchema) { - String name = field.getAsSchemaPath().getAsUnescapedPath(); - MinorType minorType = field.getType().getMinorType(); - PrimitiveTypeName primitiveTypeName = ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType); - Repetition repetition = ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode()); - OriginalType originalType = ParquetTypeHelper.getOriginalTypeForMinorType(minorType); - DecimalMetadata decimalMetadata = ParquetTypeHelper.getDecimalMetadataForField(field); - int length = ParquetTypeHelper.getLengthForMinorType(minorType); - parquet.schema.Type type = new parquet.schema.PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata); - types.add(type); + types.add(getType(field)); } schema = new MessageType("root", types); @@ -132,6 +133,34 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { setUp(schema, consumer); } + private PrimitiveType getPrimitiveType(MaterializedField field) { + MinorType minorType = field.getType().getMinorType(); + String name = field.getLastName(); + PrimitiveTypeName primitiveTypeName = ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType); + Repetition repetition = ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode()); + OriginalType originalType = ParquetTypeHelper.getOriginalTypeForMinorType(minorType); + DecimalMetadata decimalMetadata = ParquetTypeHelper.getDecimalMetadataForField(field); + int length = ParquetTypeHelper.getLengthForMinorType(minorType); + return new PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata); + } + + private parquet.schema.Type getType(MaterializedField field) { + MinorType minorType = field.getType().getMinorType(); + DataMode dataMode = field.getType().getMode(); + switch(minorType) { + case MAP: + List<parquet.schema.Type> types = Lists.newArrayList(); + for (MaterializedField childField : field.getChildren()) { + types.add(getType(childField)); + } + return new GroupType(dataMode == DataMode.REPEATED ? Repetition.REPEATED : Repetition.OPTIONAL, field.getLastName(), types); + case LIST: + throw new UnsupportedOperationException("Unsupported type " + minorType); + default: + return getPrimitiveType(field); + } + } + private void flush() throws IOException { w.startBlock(recordCount); store.flush(); @@ -163,6 +192,70 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { } @Override + public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) { + return new MapParquetConverter(fieldId, fieldName, reader); + } + + public class MapParquetConverter extends FieldConverter { + List<FieldConverter> converters = Lists.newArrayList(); + + public MapParquetConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + int i = 0; + for (String name : reader) { + FieldConverter converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, reader.reader(name)); + converters.add(converter); + } + } + + @Override + public void writeField() throws IOException { + consumer.startField(fieldName, fieldId); + consumer.startGroup(); + for (FieldConverter converter : converters) { + converter.writeField(); + } + consumer.endGroup(); + consumer.endField(fieldName, fieldId); + } + } + + @Override + public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { + return new RepeatedMapParquetConverter(fieldId, fieldName, reader); + } + + public class RepeatedMapParquetConverter extends FieldConverter { + List<FieldConverter> converters = Lists.newArrayList(); + + public RepeatedMapParquetConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + int i = 0; + for (String name : reader) { + FieldConverter converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, reader.reader(name)); + converters.add(converter); + } + } + + @Override + public void writeField() throws IOException { + if (reader.size() == 0) { + return; + } + consumer.startField(fieldName, fieldId); + while (reader.next()) { + consumer.startGroup(); + for (FieldConverter converter : converters) { + converter.writeField(); + } + consumer.endGroup(); + } + consumer.endField(fieldName, fieldId); + } + } + + + @Override public void startRecord() throws IOException { consumer.startMessage(); } @@ -176,7 +269,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void abort() throws IOException { - //To change body of implemented methods use File | Settings | File Templates. } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java index 55f2b7244..23d95b8c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java @@ -23,8 +23,11 @@ import java.io.PrintStream; import java.util.List; import java.util.Map; +import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.StringOutputRecordWriter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -117,6 +120,33 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter { } @Override + public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) { + return new ComplexStringFieldConverter(fieldId, fieldName, reader); + } + + @Override + public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { + return new ComplexStringFieldConverter(fieldId, fieldName, reader); + } + + @Override + public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) { + return new ComplexStringFieldConverter(fieldId, fieldName, reader); + } + + public class ComplexStringFieldConverter extends FieldConverter { + + public ComplexStringFieldConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + @Override + public void writeField() throws IOException { + addField(fieldId, reader.readObject().toString()); + } + } + + @Override public void cleanup() throws IOException { super.cleanup(); if (stream != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index cb770321d..c67c04743 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.drill.exec.vector.UInt4Vector; @@ -79,6 +80,9 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat clear(); offsets.allocateNew(parentValueCount+1); offsets.zeroVector(); + for(ValueVector v : vectors.values()){ + AllocationHelper.allocate(v, parentValueCount, 50, childValueCount); + } mutator.reset(); accessor.reset(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java index 53506625b..b89d26d34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java @@ -76,7 +76,10 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{ private int maxOffset; public int size(){ - return maxOffset - currentOffset; + if (isNull()) { + return 0; + } + return maxOffset - (currentOffset < 0 ? 0 : currentOffset); } public void setPosition(int index){ |