aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-07-31 23:46:26 -0700
committerJacques Nadeau <jacques@apache.org>2014-08-06 16:44:23 -0700
commit49d238eea601a6764129b9d64cf9bca440209e12 (patch)
tree6b9c7d344bbf248e0f1c66ca5f37fbcc3f92e8d9 /exec/java-exec/src/main
parent450d891eb9cc9a77e537537e5d70ecb94de697b8 (diff)
DRILL-1252: Implement Complex parquet and json writers
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java21
-rw-r--r--exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java128
-rw-r--r--exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java171
-rw-r--r--exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java119
-rw-r--r--exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java6
-rw-r--r--exec/java-exec/src/main/codegen/templates/RecordWriter.java9
-rw-r--r--exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java205
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java112
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java5
15 files changed, 763 insertions, 153 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
index 6a7fb8623..6b6065f6b 100644
--- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java
@@ -23,18 +23,35 @@
package org.apache.drill.exec.store;
import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
public abstract class AbstractRecordWriter implements RecordWriter {
+ @Override
+ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException("Doesn't support writing Map'");
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException("Doesn't support writing RepeatedMap");
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException("Doesn't support writing RepeatedList");
+ }
+
<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>
@Override
- public void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException {
- throw new UnsupportedOperationException("Doesn't support writing '${mode.prefix}${minor.class}Holder'");
+ public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException("Doesn't support writing '${mode.prefix}${minor.class}'");
}
</#list>
</#list>
diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
index b58f24cf9..e76178a59 100644
--- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java
@@ -25,11 +25,9 @@ package org.apache.drill.exec.store;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.expr.holders.*;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordValueAccessor;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import java.io.IOException;
import java.util.List;
@@ -40,61 +38,44 @@ import java.util.Map;
public class EventBasedRecordWriter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EventBasedRecordWriter.class);
- private BatchSchema schema;
- private RecordValueAccessor rva;
+ private VectorAccessible batch;
private RecordWriter recordWriter;
- private List<FieldWriter> fieldWriters;
+ private List<FieldConverter> fieldConverters;
- static private Map<MajorType, Class<? extends FieldWriter>> typeClassMap;
-
- static {
- typeClassMap = Maps.newHashMap();
-
-<#list vv.types as type>
- <#list type.minor as minor>
- <#list vv.modes as mode>
- typeClassMap.put(${mode.prefix}${minor.class}Holder.TYPE, ${mode.prefix}${minor.class}FieldWriter.class);
- </#list>
- </#list>
-</#list>
- }
-
- public EventBasedRecordWriter(BatchSchema schema, RecordValueAccessor rva, RecordWriter recordWriter)
- throws IOException {
- this.schema = schema;
- this.rva = rva;
+ public EventBasedRecordWriter(VectorAccessible batch, RecordWriter recordWriter)
+ throws IOException {
+ this.batch = batch;
this.recordWriter = recordWriter;
initFieldWriters();
}
- public int write() throws IOException {
+ public int write(int recordCount) throws IOException {
int counter = 0;
- rva.resetIterator();
- while(rva.next()) {
+ for (; counter < recordCount; counter++) {
recordWriter.startRecord();
// write the current record
- int fieldId = 0;
- for (MaterializedField field : schema) {
- fieldWriters.get(fieldId).writeField();
- fieldId++;
+ for (FieldConverter converter : fieldConverters) {
+ converter.setPosition(counter);
+ converter.startField();
+ converter.writeField();
+ converter.endField();
}
recordWriter.endRecord();
- counter++;
}
return counter;
}
private void initFieldWriters() throws IOException {
- fieldWriters = Lists.newArrayList();
+ fieldConverters = Lists.newArrayList();
try {
- for (int i = 0; i < schema.getFieldCount(); i++) {
- MajorType mt = schema.getColumn(i).getType();
- MajorType newMt = MajorType.newBuilder().setMinorType(mt.getMinorType()).setMode(mt.getMode()).build();
- fieldWriters.add(i, typeClassMap.get(newMt)
- .getConstructor(EventBasedRecordWriter.class, int.class).newInstance(this, i));
+ int fieldId = 0;
+ for (VectorWrapper w : batch) {
+ FieldReader reader = w.getValueVector().getAccessor().getReader();
+ FieldConverter converter = getConverter(recordWriter, fieldId++, w.getField().getLastName(), reader);
+ fieldConverters.add(converter);
}
} catch(Exception e) {
logger.error("Failed to create FieldWriter.", e);
@@ -102,33 +83,64 @@ public class EventBasedRecordWriter {
}
}
- abstract class FieldWriter {
+ public static abstract class FieldConverter {
protected int fieldId;
+ protected String fieldName;
+ protected FieldReader reader;
- public FieldWriter(int fieldId) {
+ public FieldConverter(int fieldId, String fieldName, FieldReader reader) {
this.fieldId = fieldId;
+ this.fieldName = fieldName;
+ this.reader = reader;
}
- public abstract void writeField() throws IOException;
- }
-
-<#list vv.types as type>
- <#list type.minor as minor>
- <#list vv.modes as mode>
- class ${mode.prefix}${minor.class}FieldWriter extends FieldWriter {
- private ${mode.prefix}${minor.class}Holder holder = new ${mode.prefix}${minor.class}Holder();
+ public void setPosition(int index) {
+ reader.setPosition(index);
+ }
- public ${mode.prefix}${minor.class}FieldWriter(int fieldId) {
- super(fieldId);
+ public void startField() throws IOException {
+ // no op
}
- public void writeField() throws IOException {
- rva.getFieldById(fieldId, holder);
- recordWriter.add${mode.prefix}${minor.class}Holder(fieldId, holder);
+ public void endField() throws IOException {
+ // no op
}
+
+ public abstract void writeField() throws IOException;
}
- </#list>
- </#list>
-</#list>
+ public static FieldConverter getConverter(RecordWriter recordWriter, int fieldId, String fieldName, FieldReader reader) {
+ switch (reader.getType().getMinorType()) {
+ case MAP:
+ switch (reader.getType().getMode()) {
+ case REQUIRED:
+ case OPTIONAL:
+ return recordWriter.getNewMapConverter(fieldId, fieldName, reader);
+ case REPEATED:
+ return recordWriter.getNewRepeatedMapConverter(fieldId, fieldName, reader);
+ }
+
+ case LIST:
+ switch (reader.getType().getMode()) {
+ case REPEATED:
+ return recordWriter.getNewRepeatedListConverter(fieldId, fieldName, reader);
+ }
+
+ <#list vv.types as type>
+ <#list type.minor as minor>
+ case ${minor.class?upper_case}:
+ switch (reader.getType().getMode()) {
+ case REQUIRED:
+ return recordWriter.getNew${minor.class}Converter(fieldId, fieldName, reader);
+ case OPTIONAL:
+ return recordWriter.getNewNullable${minor.class}Converter(fieldId, fieldName, reader);
+ case REPEATED:
+ return recordWriter.getNewRepeated${minor.class}Converter(fieldId, fieldName, reader);
+ }
+ </#list>
+ </#list>
+
+ }
+ throw new UnsupportedOperationException();
+ }
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java
new file mode 100644
index 000000000..d1a6d4e3b
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/JsonOutputRecordWriter.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.joda.time.DateTimeUtils;
+import parquet.io.api.Binary;
+
+import java.lang.Override;
+import java.lang.RuntimeException;
+
+<@pp.dropOutputFile />
+<@pp.changeOutputFile name="org/apache/drill/exec/store/JSONOutputRecordWriter.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.parquet.ParquetTypeHelper;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.io.api.Binary;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+
+
+import org.apache.drill.common.types.TypeProtos;
+
+import org.joda.time.DateTimeUtils;
+
+import java.io.IOException;
+import java.lang.UnsupportedOperationException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Abstract implementation of RecordWriter interface which exposes interface:
+ * {@link #writeHeader(List)}
+ * {@link #addField(int,String)}
+ * to output the data in string format instead of implementing addField for each type holder.
+ *
+ * This is useful for text format writers such as CSV, TSV etc.
+ */
+public abstract class JSONOutputRecordWriter extends AbstractRecordWriter implements RecordWriter {
+
+ protected JsonGenerator gen;
+
+<#list vv.types as type>
+ <#list type.minor as minor>
+ <#list vv.modes as mode>
+ <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) />
+ @Override
+ public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
+ return new ${mode.prefix}${minor.class}JsonConverter(fieldId, fieldName, reader);
+ }
+
+ public class ${mode.prefix}${minor.class}JsonConverter extends FieldConverter {
+
+ public ${mode.prefix}${minor.class}JsonConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public void startField() throws IOException {
+ gen.writeFieldName(fieldName);
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ <#if mode.prefix == "Nullable" >
+ if (!reader.isSet()) {
+ gen.writeNull();
+ return;
+ }
+ <#elseif mode.prefix == "Repeated" >
+ // empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
+ if (reader.size() == 0) {
+ return;
+ }
+ gen.writeStartArray();
+ for (int i = 0; i < reader.size(); i++) {
+ <#else>
+ </#if>
+
+ <#if minor.class == "TinyInt" ||
+ minor.class == "UInt1" ||
+ minor.class == "UInt2" ||
+ minor.class == "SmallInt" ||
+ minor.class == "Int" ||
+ minor.class == "Decimal9" ||
+ minor.class == "Float4" ||
+ minor.class == "BigInt" ||
+ minor.class == "Decimal18" ||
+ minor.class == "UInt8" ||
+ minor.class == "UInt4" ||
+ minor.class == "Float8" ||
+ minor.class == "Decimal28Sparse" ||
+ minor.class == "Decimal28Dense" ||
+ minor.class == "Decimal38Dense" ||
+ minor.class == "Decimal38Sparse">
+ <#if mode.prefix == "Repeated" >
+ gen.writeNumber(reader.read${friendlyType}(i));
+ <#else>
+ gen.writeNumber(reader.read${friendlyType}());
+ </#if>
+ <#elseif minor.class == "Date" ||
+ minor.class == "Time" ||
+ minor.class == "TimeStamp" ||
+ minor.class == "TimeTZ" ||
+ minor.class == "TimeStampTZ" ||
+ minor.class == "IntervalDay" ||
+ minor.class == "Interval" ||
+ minor.class == "VarChar" ||
+ minor.class == "Var16Char" ||
+ minor.class == "IntervalYear">
+ <#if mode.prefix == "Repeated" >
+ gen.writeString(reader.read${friendlyType}(i).toString());
+ <#else>
+ gen.writeString(reader.read${friendlyType}().toString());
+ </#if>
+ <#elseif
+ minor.class == "Bit">
+ <#if mode.prefix == "Repeated" >
+ gen.writeBoolean(reader.read${friendlyType}(i));
+ <#else>
+ gen.writeBoolean(reader.read${friendlyType}());
+ </#if>
+ <#elseif
+ minor.class == "VarBinary">
+ <#if mode.prefix == "Repeated" >
+ gen.writeBinary(reader.readByteArray(i));
+ <#else>
+ gen.writeBinary(reader.readByteArray());
+ </#if>
+ </#if>
+ <#if mode.prefix == "Repeated">
+ }
+ gen.writeEndArray();
+ </#if>
+ }
+ }
+ </#list>
+ </#list>
+</#list>
+
+}
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 5284199e1..aa25d1a88 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -33,9 +33,11 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.parquet.ParquetTypeHelper;
import org.apache.drill.exec.vector.*;
import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import parquet.io.api.RecordConsumer;
import parquet.schema.MessageType;
import parquet.io.api.Binary;
@@ -63,7 +65,7 @@ import java.util.Map;
*
* This is useful for text format writers such as CSV, TSV etc.
*/
-public abstract class ParquetOutputRecordWriter implements RecordWriter {
+public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter implements RecordWriter {
private RecordConsumer consumer;
private MessageType schema;
@@ -78,17 +80,30 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
<#list type.minor as minor>
<#list vv.modes as mode>
@Override
- public void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException {
+ public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
+ return new ${mode.prefix}${minor.class}ParquetConverter(fieldId, fieldName, reader);
+ }
+
+ public class ${mode.prefix}${minor.class}ParquetConverter extends FieldConverter {
+ private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
+
+ public ${mode.prefix}${minor.class}ParquetConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public void writeField() throws IOException {
<#if mode.prefix == "Nullable" >
- if (valueHolder.isSet == 0) {
+ if (!reader.isSet()) {
return;
}
<#elseif mode.prefix == "Repeated" >
// empty lists are represented by simply not starting a field, rather than starting one and putting in 0 elements
- if (valueHolder.start == valueHolder.end)
+ if (reader.size() == 0) {
return;
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- for (int i = valueHolder.start; i < valueHolder.end; i++) {
+ }
+ consumer.startField(fieldName, fieldId);
+ for (int i = 0; i < reader.size(); i++) {
</#if>
<#if minor.class == "TinyInt" ||
@@ -101,20 +116,24 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
minor.class == "Decimal9" ||
minor.class == "UInt4">
<#if mode.prefix == "Repeated" >
- consumer.addInteger(valueHolder.vector.getAccessor().get(i));
+ reader.read(i, holder);
+ consumer.addInteger(holder.value);
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addInteger(valueHolder.value);
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.startField(fieldName, fieldId);
+ reader.read(holder);
+ consumer.addInteger(holder.value);
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif
minor.class == "Float4">
<#if mode.prefix == "Repeated" >
- consumer.addFloat(valueHolder.vector.getAccessor().get(i));
+ reader.read(i, holder);
+ consumer.addFloat(holder.value);
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addFloat(valueHolder.value);
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.startField(fieldName, fieldId);
+ reader.read(holder);
+ consumer.addFloat(holder.value);
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif
minor.class == "BigInt" ||
@@ -122,59 +141,64 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
minor.class == "TimeStamp" ||
minor.class == "UInt8">
<#if mode.prefix == "Repeated" >
- consumer.addLong(valueHolder.vector.getAccessor().get(i));
+ reader.read(i, holder);
+ consumer.addLong(holder.value);
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addLong(valueHolder.value);
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.startField(fieldName, fieldId);
+ reader.read(holder);
+ consumer.addLong(holder.value);
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif minor.class == "Date">
<#if mode.prefix == "Repeated" >
- consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.vector.getAccessor().get(i)) + JULIAN_DAY_EPOC));
+ reader.read(i, holder);
+ consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC));
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
+ consumer.startField(fieldName, fieldId);
+ reader.read(holder);
// convert from internal Drill date format to Julian Day centered around Unix Epoc
- consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(valueHolder.value) + JULIAN_DAY_EPOC));
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) + JULIAN_DAY_EPOC));
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif
minor.class == "Float8">
<#if mode.prefix == "Repeated" >
- consumer.addDouble(valueHolder.vector.getAccessor().get(i));
+ reader.read(i, holder);
+ consumer.addDouble(holder.value);
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addDouble(valueHolder.value);
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.startField(fieldName, fieldId);
+ reader.read(holder);
+ consumer.addDouble(holder.value);
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif
minor.class == "Bit">
<#if mode.prefix == "Repeated" >
- consumer.addBoolean(valueHolder.vector.getAccessor().get(i) == 1);
+ reader.read(i, holder);
+ consumer.addBoolean(holder.value == 1);
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addBoolean(valueHolder.value == 1);
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.startField(fieldName, fieldId);
+ consumer.addBoolean(holder.value == 1);
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif
minor.class == "Decimal28Sparse" ||
minor.class == "Decimal38Sparse">
<#if mode.prefix == "Repeated" >
<#else>
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- ${minor.class}Vector tempVec = new ${minor.class}Vector(MaterializedField.create("", TypeProtos.MajorType.getDefaultInstance()), new TopLevelAllocator());
- tempVec.allocateNew(10);
- tempVec.getMutator().setSafe(0, valueHolder);
+ consumer.startField(fieldName, fieldId);
+ reader.read(holder);
byte[] bytes = DecimalUtility.getBigDecimalFromSparse(
- valueHolder.buffer, valueHolder.start, ${minor.class}Holder.nDecimalDigits, valueHolder.scale).unscaledValue().toByteArray();
+ holder.buffer, holder.start, ${minor.class}Holder.nDecimalDigits, holder.scale).unscaledValue().toByteArray();
byte[] output = new byte[ParquetTypeHelper.getLengthForMinorType(MinorType.${minor.class?upper_case})];
- if (valueHolder.getSign()) {
+ if (holder.getSign()) {
Arrays.fill(output, 0, output.length - bytes.length, (byte)0xFF);
} else {
Arrays.fill(output, 0, output.length - bytes.length, (byte)0x0);
}
System.arraycopy(bytes, 0, output, output.length - bytes.length, bytes.length);
consumer.addBinary(Binary.fromByteArray(output));
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.endField(fieldName, fieldId);
</#if>
<#elseif
minor.class == "TimeTZ" ||
@@ -190,22 +214,23 @@ public abstract class ParquetOutputRecordWriter implements RecordWriter {
</#if>
<#elseif minor.class == "VarChar" || minor.class == "Var16Char" || minor.class == "VarBinary">
<#if mode.prefix == "Repeated">
- ${minor.class}Holder singleHolder = new ${minor.class}Holder();
- valueHolder.vector.getAccessor().get(i, singleHolder);
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addBinary(Binary.fromByteBuffer(singleHolder.buffer.nioBuffer(singleHolder.start, singleHolder.end - singleHolder.start)));
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ reader.read(i, holder);
+ consumer.startField(fieldName, fieldId);
+ consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
+ consumer.endField(fieldName, fieldId);
<#else>
- ByteBuf buf = valueHolder.buffer;
- consumer.startField(schema.getFieldName(fieldId), fieldId);
- consumer.addBinary(Binary.fromByteBuffer(valueHolder.buffer.nioBuffer(valueHolder.start, valueHolder.end - valueHolder.start)));
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ reader.read(holder);
+ ByteBuf buf = holder.buffer;
+ consumer.startField(fieldName, fieldId);
+ consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
+ consumer.endField(fieldName, fieldId);
</#if>
</#if>
<#if mode.prefix == "Repeated">
}
- consumer.endField(schema.getFieldName(fieldId), fieldId);
+ consumer.endField(fieldName, fieldId);
</#if>
+ }
}
</#list>
</#list>
diff --git a/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java b/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
index d4c68171a..47197315e 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordValueAccessor.java
@@ -62,6 +62,12 @@ public class RecordValueAccessor {
return ++currentIndex < batch.getRecordCount();
}
+ public void getFieldById(int fieldId, ComplexHolder holder) {
+ holder.isSet = vectors[fieldId].getAccessor().isNull(currentIndex) ? 1 : 0;
+ holder.reader = (vectors[fieldId]).getAccessor().getReader();
+ holder.reader.setPosition(currentIndex);
+ }
+
<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>
diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
index 2334a1464..c6325fd0a 100644
--- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java
@@ -23,6 +23,8 @@ package org.apache.drill.exec.store;
import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
@@ -52,11 +54,16 @@ public interface RecordWriter {
*/
void startRecord() throws IOException;
+ /** Add the field value given in <code>valueHolder</code> at the given column number <code>fieldId</code>. */
+ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader);
+ public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader);
+ public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader);
+
<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>
/** Add the field value given in <code>valueHolder</code> at the given column number <code>fieldId</code>. */
- void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException;
+ public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader);
</#list>
</#list>
diff --git a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
index 9f0d70182..070ed7b55 100644
--- a/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/StringOutputRecordWriter.java
@@ -16,6 +16,9 @@
* limitations under the License.
*/
+import java.lang.Override;
+import java.lang.UnsupportedOperationException;
+
<@pp.dropOutputFile />
<@pp.changeOutputFile name="org/apache/drill/exec/store/StringOutputRecordWriter.java" />
<#include "/@includes/license.ftl" />
@@ -28,7 +31,9 @@ import org.apache.drill.exec.expr.holders.*;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import java.io.IOException;
import java.lang.UnsupportedOperationException;
@@ -45,45 +50,65 @@ import java.util.Map;
*/
public abstract class StringOutputRecordWriter implements RecordWriter {
- private ValueVector[] columnVectors;
private final BufferAllocator allocator;
protected StringOutputRecordWriter(BufferAllocator allocator){
this.allocator = allocator;
}
public void updateSchema(BatchSchema schema) throws IOException {
- cleanupColumnVectors();
- columnVectors = new ValueVector[schema.getFieldCount()];
-
List<String> columnNames = Lists.newArrayList();
- for (int i=0; i<columnVectors.length; i++) {
- columnNames.add(schema.getColumn(i).getAsSchemaPath().getAsUnescapedPath());
+ for (int i=0; i < schema.getFieldCount(); i++) {
+ columnNames.add(schema.getColumn(i).getLastName());
}
startNewSchema(columnNames);
+ }
- for (int i=0; i<columnVectors.length; i++) {
- columnVectors[i] = TypeHelper.getNewVector(schema.getColumn(i), allocator);
- AllocationHelper.allocate(columnVectors[i], 1, TypeHelper.getSize(schema.getColumn(i).getType()));
- }
+ @Override
+ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException();
+ }
+ public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException();
+ }
+ public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
+ throw new UnsupportedOperationException();
}
<#list vv.types as type>
<#list type.minor as minor>
<#list vv.modes as mode>
@Override
- public void add${mode.prefix}${minor.class}Holder(int fieldId, ${mode.prefix}${minor.class}Holder valueHolder) throws IOException {
+ public FieldConverter getNew${mode.prefix}${minor.class}Converter(int fieldId, String fieldName, FieldReader reader) {
+ return new ${mode.prefix}${minor.class}StringFieldConverter(fieldId, fieldName, reader);
+ }
+
+ public class ${mode.prefix}${minor.class}StringFieldConverter extends FieldConverter {
+ <#if mode.prefix == "Repeated">
+ private Repeated${minor.class}Holder holder = new Repeated${minor.class}Holder();
+ <#else>
+ private Nullable${minor.class}Holder holder = new Nullable${minor.class}Holder();
+ </#if>
+
+ public ${mode.prefix}${minor.class}StringFieldConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public void writeField() throws IOException {
<#if mode.prefix == "Nullable" >
- if (valueHolder.isSet == 0) {
+ if (!reader.isSet()) {
addField(fieldId, null);
return;
}
<#elseif mode.prefix == "Repeated" >
throw new UnsupportedOperationException("Repeated types are not supported.");
+ }
}
<#break>
</#if>
+ reader.read(holder);
<#if minor.class == "TinyInt" ||
minor.class == "UInt1" ||
minor.class == "UInt2" ||
@@ -94,9 +119,9 @@ public abstract class StringOutputRecordWriter implements RecordWriter {
minor.class == "BigInt" ||
minor.class == "UInt8" ||
minor.class == "Float8">
- addField(fieldId, String.valueOf(valueHolder.value));
+ addField(fieldId, String.valueOf(holder.value));
<#elseif minor.class == "Bit">
- addField(fieldId, valueHolder.value == 0 ? "false" : "true");
+ addField(fieldId, holder.value == 0 ? "false" : "true");
<#elseif
minor.class == "Date" ||
minor.class == "Time" ||
@@ -114,33 +139,21 @@ public abstract class StringOutputRecordWriter implements RecordWriter {
minor.class == "Decimal38Sparse">
// TODO: error check
- ((${mode.prefix}${minor.class}Vector)columnVectors[fieldId]).getMutator().setSafe(0, valueHolder);
- Object obj = ((${mode.prefix}${minor.class}Vector)columnVectors[fieldId]).getAccessor().getObject(0);
- addField(fieldId, obj.toString());
+ addField(fieldId, reader.readObject().toString());
<#elseif minor.class == "VarChar" || minor.class == "Var16Char" || minor.class == "VarBinary">
- addField(fieldId, valueHolder.toString());
+ addField(fieldId, reader.readObject().toString());
<#else>
throw new UnsupportedOperationException(String.format("Unsupported field type: %s"),
- valueHolder.getCanonicalClass());
+ holder.getCanonicalClass());
</#if>
+ }
}
</#list>
</#list>
</#list>
public void cleanup() throws IOException {
- cleanupColumnVectors();
- }
-
- private void cleanupColumnVectors() {
- if (columnVectors != null){
- for(ValueVector vector : columnVectors){
- if(vector!=null){
- vector.clear();
- }
- }
- }
}
public abstract void startNewSchema(List<String> columnNames) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 29b346ddd..ef4db2a78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -108,7 +108,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
// fall through.
case OK:
try {
- counter += eventBasedRecordWriter.write();
+ counter += eventBasedRecordWriter.write(incoming.getRecordCount());
logger.debug("Total records written so far: {}", counter);
} catch(IOException ex) {
throw new RuntimeException(ex);
@@ -162,8 +162,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
stats.stopSetup();
}
- eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(),
- new RecordValueAccessor(incoming), recordWriter);
+ eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index a2d22cfe4..540977dec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -50,6 +50,10 @@ public class MaterializedField {
.setNamePart(key.path.getAsNamePart());
}
+ public List<MaterializedField> getChildren() {
+ return children;
+ }
+
public void addChild(MaterializedField field){
children.add(field);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 7fbb9c7a6..8b5577c06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -19,20 +19,24 @@ package org.apache.drill.exec.store.easy.json;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
@@ -40,6 +44,8 @@ import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.text.DrillTextRecordWriter;
+import org.apache.hadoop.fs.FileSystem;
public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
@@ -59,7 +65,23 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
@Override
public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
- throw new UnsupportedOperationException("Json Writer is not supported currently.");
+ Map<String, String> options = Maps.newHashMap();
+
+ options.put("location", writer.getLocation());
+
+ FragmentHandle handle = context.getHandle();
+ String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
+ options.put("prefix", fragmentId);
+
+ options.put("separator", " ");
+ options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
+
+ options.put("extension", "json");
+
+ RecordWriter recordWriter = new JsonRecordWriter();
+ recordWriter.init(options);
+
+ return recordWriter;
}
@JsonTypeName("json")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
new file mode 100644
index 000000000..da9f48b77
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.JSONOutputRecordWriter;
+import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWriter {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
+
+ private String location;
+ private String prefix;
+
+ private String fieldDelimiter;
+ private String extension;
+
+ private int index;
+ private FileSystem fs = null;
+ private FSDataOutputStream stream = null;
+
+ private final JsonFactory factory = new JsonFactory();
+
+ // Record write status
+ private boolean fRecordStarted = false; // true once the startRecord() is called until endRecord() is called
+
+ public JsonRecordWriter(){
+ }
+
+ @Override
+ public void init(Map<String, String> writerOptions) throws IOException {
+ this.location = writerOptions.get("location");
+ this.prefix = writerOptions.get("prefix");
+ this.fieldDelimiter = writerOptions.get("separator");
+ this.extension = writerOptions.get("extension");
+
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, writerOptions.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ this.fs = FileSystem.get(conf);
+
+ Path fileName = new Path(location, prefix + "_" + index + "." + extension);
+ try {
+ stream = fs.create(fileName);
+ gen = factory.createGenerator(stream).useDefaultPrettyPrinter();
+ logger.debug("Created file: {}", fileName);
+ } catch (IOException ex) {
+ logger.error("Unable to create file: " + fileName, ex);
+ throw ex;
+ }
+ }
+
+ @Override
+ public void updateSchema(BatchSchema schema) throws IOException {
+ // no op
+ }
+
+ @Override
+ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new MapJsonConverter(fieldId, fieldName, reader);
+ }
+
+ public class MapJsonConverter extends FieldConverter {
+ List<FieldConverter> converters = Lists.newArrayList();
+
+ public MapJsonConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ int i = 0;
+ for (String name : reader) {
+ FieldConverter converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, i++, name, reader.reader(name));
+ converters.add(converter);
+ }
+ }
+
+ @Override
+ public void startField() throws IOException {
+ gen.writeFieldName(fieldName);
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ gen.writeStartObject();
+ for (FieldConverter converter : converters) {
+ converter.startField();
+ converter.writeField();
+ }
+ gen.writeEndObject();
+ }
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new RepeatedMapJsonConverter(fieldId, fieldName, reader);
+ }
+
+ public class RepeatedMapJsonConverter extends FieldConverter {
+ List<FieldConverter> converters = Lists.newArrayList();
+
+ public RepeatedMapJsonConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ int i = 0;
+ for (String name : reader) {
+ FieldConverter converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, i++, name, reader.reader(name));
+ converters.add(converter);
+ }
+ }
+
+ @Override
+ public void startField() throws IOException {
+ gen.writeFieldName(fieldName);
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ gen.writeStartArray();
+ while (reader.next()) {
+ gen.writeStartObject();
+ for (FieldConverter converter : converters) {
+ converter.startField();
+ converter.writeField();
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+ }
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new RepeatedListJsonConverter(fieldId, fieldName, reader);
+ }
+
+ public class RepeatedListJsonConverter extends FieldConverter {
+ FieldConverter converter;
+
+ public RepeatedListJsonConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ converter = EventBasedRecordWriter.getConverter(JsonRecordWriter.this, fieldId, fieldName, reader.reader());
+ }
+
+ @Override
+ public void startField() throws IOException {
+ gen.writeFieldName(fieldName);
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ gen.writeStartArray();
+ while (reader.next()) {
+ converter.writeField();
+ }
+ gen.writeEndArray();
+ }
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+ gen.writeStartObject();
+ fRecordStarted = true;
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ gen.writeEndObject();
+ fRecordStarted = false;
+ }
+
+ @Override
+ public void abort() throws IOException {
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ gen.flush();
+ stream.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index a3363168b..94ccc133a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -18,11 +18,18 @@
package org.apache.drill.exec.store.parquet;
import com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.EventBasedRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -36,8 +43,10 @@ import parquet.io.ColumnIOFactory;
import parquet.io.MessageColumnIO;
import parquet.io.api.RecordConsumer;
import parquet.schema.DecimalMetadata;
+import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type;
import parquet.schema.Type.Repetition;
@@ -107,15 +116,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private void newSchema() throws IOException {
List<Type> types = Lists.newArrayList();
for (MaterializedField field : batchSchema) {
- String name = field.getAsSchemaPath().getAsUnescapedPath();
- MinorType minorType = field.getType().getMinorType();
- PrimitiveTypeName primitiveTypeName = ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType);
- Repetition repetition = ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode());
- OriginalType originalType = ParquetTypeHelper.getOriginalTypeForMinorType(minorType);
- DecimalMetadata decimalMetadata = ParquetTypeHelper.getDecimalMetadataForField(field);
- int length = ParquetTypeHelper.getLengthForMinorType(minorType);
- parquet.schema.Type type = new parquet.schema.PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata);
- types.add(type);
+ types.add(getType(field));
}
schema = new MessageType("root", types);
@@ -132,6 +133,34 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
setUp(schema, consumer);
}
+ private PrimitiveType getPrimitiveType(MaterializedField field) {
+ MinorType minorType = field.getType().getMinorType();
+ String name = field.getLastName();
+ PrimitiveTypeName primitiveTypeName = ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType);
+ Repetition repetition = ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode());
+ OriginalType originalType = ParquetTypeHelper.getOriginalTypeForMinorType(minorType);
+ DecimalMetadata decimalMetadata = ParquetTypeHelper.getDecimalMetadataForField(field);
+ int length = ParquetTypeHelper.getLengthForMinorType(minorType);
+ return new PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata);
+ }
+
+ private parquet.schema.Type getType(MaterializedField field) {
+ MinorType minorType = field.getType().getMinorType();
+ DataMode dataMode = field.getType().getMode();
+ switch(minorType) {
+ case MAP:
+ List<parquet.schema.Type> types = Lists.newArrayList();
+ for (MaterializedField childField : field.getChildren()) {
+ types.add(getType(childField));
+ }
+ return new GroupType(dataMode == DataMode.REPEATED ? Repetition.REPEATED : Repetition.OPTIONAL, field.getLastName(), types);
+ case LIST:
+ throw new UnsupportedOperationException("Unsupported type " + minorType);
+ default:
+ return getPrimitiveType(field);
+ }
+ }
+
private void flush() throws IOException {
w.startBlock(recordCount);
store.flush();
@@ -163,6 +192,70 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
}
@Override
+ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new MapParquetConverter(fieldId, fieldName, reader);
+ }
+
+ public class MapParquetConverter extends FieldConverter {
+ List<FieldConverter> converters = Lists.newArrayList();
+
+ public MapParquetConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ int i = 0;
+ for (String name : reader) {
+ FieldConverter converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, reader.reader(name));
+ converters.add(converter);
+ }
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ consumer.startField(fieldName, fieldId);
+ consumer.startGroup();
+ for (FieldConverter converter : converters) {
+ converter.writeField();
+ }
+ consumer.endGroup();
+ consumer.endField(fieldName, fieldId);
+ }
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new RepeatedMapParquetConverter(fieldId, fieldName, reader);
+ }
+
+ public class RepeatedMapParquetConverter extends FieldConverter {
+ List<FieldConverter> converters = Lists.newArrayList();
+
+ public RepeatedMapParquetConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ int i = 0;
+ for (String name : reader) {
+ FieldConverter converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, reader.reader(name));
+ converters.add(converter);
+ }
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ if (reader.size() == 0) {
+ return;
+ }
+ consumer.startField(fieldName, fieldId);
+ while (reader.next()) {
+ consumer.startGroup();
+ for (FieldConverter converter : converters) {
+ converter.writeField();
+ }
+ consumer.endGroup();
+ }
+ consumer.endField(fieldName, fieldId);
+ }
+ }
+
+
+ @Override
public void startRecord() throws IOException {
consumer.startMessage();
}
@@ -176,7 +269,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
@Override
public void abort() throws IOException {
- //To change body of implemented methods use File | Settings | File Templates.
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
index 55f2b7244..23d95b8c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordWriter.java
@@ -23,8 +23,11 @@ import java.io.PrintStream;
import java.util.List;
import java.util.Map;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.StringOutputRecordWriter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -117,6 +120,33 @@ public class DrillTextRecordWriter extends StringOutputRecordWriter {
}
@Override
+ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new ComplexStringFieldConverter(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new ComplexStringFieldConverter(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
+ return new ComplexStringFieldConverter(fieldId, fieldName, reader);
+ }
+
+ public class ComplexStringFieldConverter extends FieldConverter {
+
+ public ComplexStringFieldConverter(int fieldId, String fieldName, FieldReader reader) {
+ super(fieldId, fieldName, reader);
+ }
+
+ @Override
+ public void writeField() throws IOException {
+ addField(fieldId, reader.readObject().toString());
+ }
+ }
+
+ @Override
public void cleanup() throws IOException {
super.cleanup();
if (stream != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index cb770321d..c67c04743 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
import org.apache.drill.exec.vector.UInt4Vector;
@@ -79,6 +80,9 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
clear();
offsets.allocateNew(parentValueCount+1);
offsets.zeroVector();
+ for(ValueVector v : vectors.values()){
+ AllocationHelper.allocate(v, parentValueCount, 50, childValueCount);
+ }
mutator.reset();
accessor.reset();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
index 53506625b..b89d26d34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -76,7 +76,10 @@ public class RepeatedMapReaderImpl extends AbstractFieldReader{
private int maxOffset;
public int size(){
- return maxOffset - currentOffset;
+ if (isNull()) {
+ return 0;
+ }
+ return maxOffset - (currentOffset < 0 ? 0 : currentOffset);
}
public void setPosition(int index){