aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-01-23 18:32:03 -0800
committerJacques Nadeau <jacques@apache.org>2014-03-03 23:21:50 -0800
commitcdf46fd36fdfc2e3029a6b2e077330c665e43c2e (patch)
treed69cb592765321c67c1c6571e89c840772e2c7f8 /exec/java-exec/src
parenta9a7ea84c99d8a9efcccc7d9a870121a26212b49 (diff)
DRILL-357: Hive Storage Engine phase 2 - hive record reader
Diffstat (limited to 'exec/java-exec/src')
-rw-r--r--exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java38
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java173
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java473
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java247
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java194
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java124
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java325
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java172
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java (renamed from exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java)0
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java)37
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java67
-rw-r--r--exec/java-exec/src/test/resources/hive/test.json75
20 files changed, 2079 insertions, 29 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 57927a71d..b059d89fa 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -260,7 +260,43 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
assert index >= 0;
int currentOffset = offsetVector.getAccessor().get(index);
offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
- data.setBytes(currentOffset, bytes);
+ data.setBytes(currentOffset, bytes, 0, bytes.length);
+ }
+
+ public boolean setSafe(int index, byte[] bytes) {
+ assert index >= 0;
+ int currentOffset = offsetVector.getAccessor().get(index);
+ if (data.capacity() < currentOffset + bytes.length) return false;
+ offsetVector.getMutator().set(index + 1, currentOffset + bytes.length);
+ data.setBytes(currentOffset, bytes, 0, bytes.length);
+ return true;
+ }
+
+ /**
+ * Set the variable length element at the specified index to the supplied byte array.
+ *
+ * @param index position of the bit to set
+ * @param bytes array of bytes to write
+ * @param start start index of bytes to write
+ * @param length length of bytes to write
+ */
+ public void set(int index, byte[] bytes, int start, int length) {
+ assert index >= 0;
+ int currentOffset = offsetVector.getAccessor().get(index);
+ offsetVector.getMutator().set(index + 1, currentOffset + length);
+ data.setBytes(currentOffset, bytes, start, length);
+ }
+
+ public boolean setSafe(int index, byte[] bytes, int start, int length) {
+ assert index >= 0;
+
+ int currentOffset = offsetVector.getAccessor().get(index);
+
+ if (data.capacity() < currentOffset + length) return false;
+
+ offsetVector.getMutator().set(index + 1, currentOffset + length);
+ data.setBytes(currentOffset, bytes, start, length);
+ return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
index e9946dfb8..688e68005 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
@@ -28,7 +28,7 @@ public class TemplateClassDefinition<T>{
private final Class<T> iface;
private final Class<?> template;
private final SignatureHolder signature;
- private final AtomicLong classNumber = new AtomicLong(0);
+ private static final AtomicLong classNumber = new AtomicLong(0);
public <X extends T> TemplateClassDefinition(Class<T> iface, Class<X> template) {
super();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 8c1487ca0..94fcac51c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -72,6 +72,7 @@ public class Wrapper {
public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
Preconditions.checkState(!endpointsAssigned);
+ Preconditions.checkNotNull(endpoint);
EndpointAffinity ea = endpointAffinity.get(endpoint);
if (ea == null) {
ea = new EndpointAffinity(endpoint);
@@ -149,6 +150,7 @@ public class Wrapper {
int start = ThreadLocalRandom.current().nextInt(div);
// round robin with random start.
for (int i = start; i < start + width; i++) {
+ Preconditions.checkNotNull(all.get(i % div));
endpoints.add(all.get(i % div));
}
} else {
@@ -156,6 +158,7 @@ public class Wrapper {
Collections.sort(values);
values = Lists.reverse(values);
for (int i = 0; i < width; i++) {
+ Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint());
endpoints.add(values.get(i%values.size()).getEndpoint());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
new file mode 100644
index 000000000..880970643
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.util.*;
+
+public class HiveInputReader {
+ public static void main(String args[]) throws Exception {
+/*
+ String[] columnNames = {"n_nationkey", "n_name", "n_regionkey", "n_comment"};
+ String[] columnTypes = {"bigint", "string", "bigint", "string"};
+
+ List<FieldSchema> cols = Lists.newArrayList();
+
+ for (int i = 0; i < columnNames.length; i++) {
+ cols.add(new FieldSchema(columnNames[i], columnTypes[i], null));
+ }
+ String location = "file:///tmp/nation_s";
+ String inputFormat = TextInputFormat.class.getCanonicalName();
+ String serdeLib = LazySimpleSerDe.class.getCanonicalName();
+// String inputFormat = HiveHBaseTableInputFormat.class.getCanonicalName();
+// String serdeLib = HBaseSerDe.class.getCanonicalName();
+ Map<String, String> serdeParams = new HashMap();
+// serdeParams.put("serialization.format", "1");
+// serdeParams.put("hbase.columns.mapping", ":key,f:name,f:regionkey,f:comment");
+ serdeParams.put("serialization.format", "|");
+ serdeParams.put("field.delim", "|");
+
+
+ Map<String, String> tableParams = new HashMap();
+ tableParams.put("hbase.table.name", "nation");
+ SerDeInfo serDeInfo = new SerDeInfo(null, serdeLib, serdeParams);
+ StorageDescriptor storageDescriptor = new StorageDescriptor(cols, location, inputFormat, null, false, -1, serDeInfo, null, null, null);
+ Table table = new Table("table", "default", "sphillips", 0, 0, 0, storageDescriptor, new ArrayList<FieldSchema>(), tableParams, null, null, "MANAGED_TABLE");
+ Properties properties = MetaStoreUtils.getTableMetadata(table);
+ */
+
+ HiveConf conf = new HiveConf();
+ conf.set("hive.metastore.uris", "thrift://10.10.31.51:9083");
+ HiveMetaStoreClient client = new HiveMetaStoreClient(conf);
+ Table table = client.getTable("default", "nation");
+ Properties properties = MetaStoreUtils.getTableMetadata(table);
+
+ Path path = new Path(table.getSd().getLocation());
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+// job.set("hbase.zookeeper.quorum", "10.10.31.51");
+// job.set("hbase.zookeeper.property.clientPort", "5181");
+ InputFormat f = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(f.getClass());
+ FileInputFormat.addInputPath(job, path);
+ InputFormat format = job.getInputFormat();
+ SerDe serde = (SerDe) Class.forName(table.getSd().getSerdeInfo().getSerializationLib()).getConstructor().newInstance();
+ serde.initialize(job, properties);
+ ObjectInspector inspector = serde.getObjectInspector();
+ ObjectInspector.Category cat = inspector.getCategory();
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(inspector);
+ List<String> columns = null;
+ List<TypeInfo> colTypes = null;
+ List<ObjectInspector> fieldObjectInspectors = Lists.newArrayList();
+
+ switch(typeInfo.getCategory()) {
+ case STRUCT:
+ columns = ((StructTypeInfo) typeInfo).getAllStructFieldNames();
+ colTypes = ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos();
+ for (int i = 0; i < columns.size(); i++) {
+ System.out.print(columns.get(i));
+ System.out.print(" ");
+ System.out.print(colTypes.get(i));
+ }
+ System.out.println("");
+ for (StructField field : ((StructObjectInspector)inspector).getAllStructFieldRefs()) {
+ fieldObjectInspectors.add(field.getFieldObjectInspector());
+ }
+ }
+
+ for (InputSplit split : format.getSplits(job, 1)) {
+ String encoded = serializeInputSplit(split);
+ System.out.println(encoded);
+ InputSplit newSplit = deserializeInputSplit(encoded, split.getClass().getCanonicalName());
+ System.out.print("Length: " + newSplit.getLength() + " ");
+ System.out.print("Locations: ");
+ for (String loc : newSplit.getLocations()) System.out.print(loc + " " );
+ System.out.println();
+ }
+
+ for (InputSplit split : format.getSplits(job, 1)) {
+ RecordReader reader = format.getRecordReader(split, job, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ int count = 0;
+ while (reader.next(key, value)) {
+ List<Object> values = ((StructObjectInspector) inspector).getStructFieldsDataAsList(serde.deserialize((Writable) value));
+ StructObjectInspector sInsp = (StructObjectInspector) inspector;
+ Object obj = sInsp.getStructFieldData(serde.deserialize((Writable) value) , sInsp.getStructFieldRef("n_name"));
+ System.out.println(obj);
+ /*
+ for (Object obj : values) {
+ PrimitiveObjectInspector.PrimitiveCategory pCat = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveCategory();
+ Object pObj = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveJavaObject(obj);
+ System.out.print(pObj + " ");
+ }
+ */
+ System.out.println("");
+ }
+ }
+ }
+
+ public static String serializeInputSplit(InputSplit split) throws IOException {
+ ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
+ split.write(byteArrayOutputStream);
+ return Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+ }
+
+ public static InputSplit deserializeInputSplit(String base64, String className) throws Exception {
+ InputSplit split;
+ if (Class.forName(className) == FileSplit.class) {
+ split = new FileSplit((Path) null, 0, 0, (String[])null);
+ } else {
+ split = (InputSplit) Class.forName(className).getConstructor().newInstance();
+ }
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
+ split.readFields(byteArrayDataInput);
+ return split;
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 41a4d3d2b..6211e2186 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -17,19 +17,47 @@
*/
package org.apache.drill.exec.store.hive;
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.ReadEntry;
import org.apache.drill.exec.physical.base.Size;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.List;
public class HiveReadEntry implements ReadEntry {
- private final HiveConf conf;
- private final String table;
- private Size size;
- public HiveReadEntry(HiveConf conf, String table) {
- this.conf = conf;
+ @JsonProperty("table")
+ public HiveTable table;
+ @JsonProperty("partitions")
+ public List<HiveTable.HivePartition> partitions;
+
+ @JsonIgnore
+ private List<Partition> partitionsUnwrapped = Lists.newArrayList();
+
+ @JsonCreator
+ public HiveReadEntry(@JsonProperty("table") HiveTable table, @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) {
this.table = table;
+ this.partitions = partitions;
+ if (partitions != null) {
+ for(HiveTable.HivePartition part : partitions) {
+ partitionsUnwrapped.add(part.getPartition());
+ }
+ }
+ }
+
+ @JsonIgnore
+ public Table getTable() {
+ return table.getTable();
+ }
+
+ @JsonIgnore
+ public List<Partition> getPartitions() {
+ return partitionsUnwrapped;
}
@Override
@@ -40,11 +68,10 @@ public class HiveReadEntry implements ReadEntry {
@Override
public Size getSize() {
- if (size != null) {
- // TODO: contact the metastore and find the size of the data in table
- size = new Size(1, 1);
- }
+ // TODO: contact the metastore and find the size of the data in table
+ Size size = new Size(1, 1);
return size;
}
}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
new file mode 100644
index 000000000..ef7266c52
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+public class HiveReadEntryOld implements ReadEntry {
+ private final HiveConf conf;
+ private final String table;
+ private Size size;
+
+ public HiveReadEntryOld(HiveConf conf, String table) {
+ this.conf = conf;
+ this.table = table;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ // TODO: need to come up with way to calculate the cost for Hive tables
+ return new OperatorCost(1, 1, 2, 2);
+ }
+
+ @Override
+ public Size getSize() {
+ if (size != null) {
+ // TODO: contact the metastore and find the size of the data in table
+ size = new Size(1, 1);
+ }
+
+ return size;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
new file mode 100644
index 000000000..0a31a1252
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+public class HiveRecordReader implements RecordReader {
+
+ protected Table table;
+ protected Partition partition;
+ protected InputSplit inputSplit;
+ protected FragmentContext context;
+ protected List<FieldReference> columns;
+ protected List<String> columnNames;
+ protected List<String> partitionNames = Lists.newArrayList();
+ protected List<String> selectedPartitionNames = Lists.newArrayList();
+ protected List<String> selectedPartitionTypes = Lists.newArrayList();
+ protected List<String> tableColumns;
+ protected SerDe serde;
+ protected StructObjectInspector sInspector;
+ protected List<PrimitiveObjectInspector> fieldInspectors = Lists.newArrayList();
+ protected List<PrimitiveCategory> primitiveCategories = Lists.newArrayList();
+ protected Object key, value;
+ protected org.apache.hadoop.mapred.RecordReader reader;
+ protected List<ValueVector> vectors = Lists.newArrayList();
+ protected List<ValueVector> pVectors = Lists.newArrayList();
+ protected Object redoRecord;
+ List<Object> partitionValues = Lists.newArrayList();
+
+ protected static final int TARGET_RECORD_COUNT = 4000;
+
+ public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException {
+ this.table = table;
+ this.partition = partition;
+ this.inputSplit = inputSplit;
+ this.context = context;
+ this.columns = columns;
+ init();
+ }
+
+ private void init() throws ExecutionSetupException {
+ Properties properties;
+ JobConf job = new JobConf();
+ if (partition != null) {
+ properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ } else {
+ properties = MetaStoreUtils.getTableMetadata(table);
+ }
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ InputFormat format;
+ String sLib = (partition == null) ? table.getSd().getSerdeInfo().getSerializationLib() : partition.getSd().getSerdeInfo().getSerializationLib();
+ String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat();
+ try {
+ format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance();
+ Class c = Class.forName(sLib);
+ serde = (SerDe) c.getConstructor().newInstance();
+ serde.initialize(job, properties);
+ } catch (ReflectiveOperationException | SerDeException e) {
+ throw new ExecutionSetupException("Unable to instantiate InputFormat", e);
+ }
+ job.setInputFormat(format.getClass());
+
+ if (partition != null) {
+ List<FieldSchema> partitionKeys = table.getPartitionKeys();
+ for (FieldSchema field : partitionKeys) {
+ partitionNames.add(field.getName());
+ }
+ }
+
+ try {
+ ObjectInspector oi = serde.getObjectInspector();
+ if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+ throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+ }
+ sInspector = (StructObjectInspector) oi;
+ StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector);
+ if (columns == null) {
+ columnNames = sTypeInfo.getAllStructFieldNames();
+ tableColumns = columnNames;
+ } else {
+ tableColumns = sTypeInfo.getAllStructFieldNames();
+ List<Integer> columnIds = Lists.newArrayList();
+ columnNames = Lists.newArrayList();
+ for (FieldReference field : columns) {
+ String columnName = field.getPath().toString();
+ if (!tableColumns.contains(columnName)) {
+ if (partition != null && partitionNames.contains(columnName)) {
+ selectedPartitionNames.add(columnName);
+ } else {
+ throw new ExecutionSetupException(String.format("Column %s does not exist", columnName));
+ }
+ } else {
+ columnIds.add(tableColumns.indexOf(columnName));
+ columnNames.add(columnName);
+ }
+ }
+ ColumnProjectionUtils.appendReadColumnIDs(job, columnIds);
+ ColumnProjectionUtils.appendReadColumnNames(job, columnNames);
+ }
+ for (String columnName : columnNames) {
+ ObjectInspector poi = sInspector.getStructFieldRef(columnName).getFieldObjectInspector();
+ if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+ throw new UnsupportedOperationException(String.format("%s type not supported", poi.getCategory()));
+ }
+ PrimitiveObjectInspector pInspector = (PrimitiveObjectInspector) poi;
+ fieldInspectors.add(pInspector);
+ primitiveCategories.add(pInspector.getPrimitiveCategory());
+ }
+
+ if (columns == null) {
+ selectedPartitionNames = partitionNames;
+ }
+
+ if (partition != null) {
+ for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+ FieldSchema field = table.getPartitionKeys().get(i);
+ if (selectedPartitionNames.contains(field.getName())) {
+ selectedPartitionTypes.add(field.getType());
+ partitionValues.add(convertPartitionType(field.getType(), partition.getValues().get(i)));
+ }
+ }
+ }
+ } catch (SerDeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ try {
+ reader = format.getRecordReader(inputSplit, job, Reporter.NULL);
+ } catch (IOException e) {
+ throw new ExecutionSetupException("Failed to get Recordreader", e);
+ }
+ key = reader.createKey();
+ value = reader.createValue();
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ output.removeAllFields();
+ try {
+ for (int i = 0; i < columnNames.size(); i++) {
+ PrimitiveCategory pCat = primitiveCategories.get(i);
+ MaterializedField field = MaterializedField.create(new SchemaPath(columnNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(pCat));
+ ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+ vectors.add(vv);
+ output.addField(vv);
+ }
+ for (int i = 0; i < selectedPartitionNames.size(); i++) {
+ String type = selectedPartitionTypes.get(i);
+ MaterializedField field = MaterializedField.create(new SchemaPath(selectedPartitionNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(type));
+ ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+ pVectors.add(vv);
+ output.addField(vv);
+ }
+ output.setNewSchema();
+ } catch(SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ protected void populatePartitionVectors(int recordCount) {
+ for (int i = 0; i < pVectors.size(); i++) {
+ int size = 50;
+ ValueVector vector = pVectors.get(i);
+ Object val = partitionValues.get(i);
+ if (selectedPartitionTypes.get(i).equals("string") || selectedPartitionTypes.get(i).equals("binary")) {
+ size = ((byte[]) partitionValues.get(i)).length;
+ }
+ VectorAllocator.getAllocator(vector, size).alloc(recordCount);
+ switch(selectedPartitionTypes.get(i)) {
+ case "boolean": {
+ BitVector v = (BitVector) vector;
+ Boolean value = (Boolean) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value ? 1 : 0);
+ }
+ break;
+ }
+ case "tinyint": {
+ TinyIntVector v = (TinyIntVector) vector;
+ byte value = (byte) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "double": {
+ Float8Vector v = (Float8Vector) vector;
+ double value = (double) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "float": {
+ Float4Vector v = (Float4Vector) vector;
+ float value = (float) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "int": {
+ IntVector v = (IntVector) vector;
+ int value = (int) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "bigint": {
+ BigIntVector v = (BigIntVector) vector;
+ long value = (long) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "smallint": {
+ SmallIntVector v = (SmallIntVector) vector;
+ short value = (short) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ case "string": {
+ VarCharVector v = (VarCharVector) vector;
+ byte[] value = (byte[]) val;
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().set(j, value);
+ }
+ break;
+ }
+ default:
+ throw new UnsupportedOperationException("Could not determine type: " + selectedPartitionTypes.get(i));
+ }
+ vector.getMutator().setValueCount(recordCount);
+ }
+ }
+
+ private Object convertPartitionType(String type, String value) {
+ switch (type) {
+ case "boolean":
+ return Boolean.parseBoolean(value);
+ case "tinyint":
+ return Byte.parseByte(value);
+ case "double":
+ return Double.parseDouble(value);
+ case "float":
+ return Float.parseFloat(value);
+ case "int":
+ return Integer.parseInt(value);
+ case "bigint":
+ return Long.parseLong(value);
+ case "smallint":
+ return Short.parseShort(value);
+ case "string":
+ return value.getBytes();
+ default:
+ throw new UnsupportedOperationException("Could not determine type: " + type);
+ }
+ }
+
+ public static TypeProtos.MajorType getMajorType(String type) {
+ switch(type) {
+ case "binary":
+ return Types.required(TypeProtos.MinorType.VARBINARY);
+ case "boolean":
+ return Types.required(TypeProtos.MinorType.BIT);
+ case "tinyint":
+ return Types.required(TypeProtos.MinorType.TINYINT);
+ case "decimal":
+ return Types.required(TypeProtos.MinorType.DECIMAL16);
+ case "double":
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ case "float":
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case "int":
+ return Types.required(TypeProtos.MinorType.INT);
+ case "bigint":
+ return Types.required(TypeProtos.MinorType.BIGINT);
+ case "smallint":
+ return Types.required(TypeProtos.MinorType.SMALLINT);
+ case "string":
+ return Types.required(TypeProtos.MinorType.VARCHAR);
+ case "varchar":
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type: " + type);
+ }
+ }
+
+ public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) {
+ switch(pCat) {
+ case BINARY:
+ return Types.required(TypeProtos.MinorType.VARBINARY);
+ case BOOLEAN:
+ return Types.required(TypeProtos.MinorType.BIT);
+ case BYTE:
+ return Types.required(TypeProtos.MinorType.TINYINT);
+ case DECIMAL:
+ return Types.required(TypeProtos.MinorType.DECIMAL16);
+ case DOUBLE:
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ case FLOAT:
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case INT:
+ return Types.required(TypeProtos.MinorType.INT);
+ case LONG:
+ return Types.required(TypeProtos.MinorType.BIGINT);
+ case SHORT:
+ return Types.required(TypeProtos.MinorType.SMALLINT);
+ case STRING:
+ return Types.required(TypeProtos.MinorType.VARCHAR);
+ case TIMESTAMP:
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type");
+ }
+ }
+
+ public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, Object fieldValue) {
+ switch(pCat) {
+ case BINARY:
+ ((VarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue);
+ case BOOLEAN:
+ boolean isSet = (boolean) fieldValue;
+ return ((BitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 );
+ case BYTE:
+ return ((TinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue);
+ case DECIMAL:
+ throw new UnsupportedOperationException();
+ case DOUBLE:
+ return ((Float8Vector) vv).getMutator().setSafe(index, (double) fieldValue);
+ case FLOAT:
+ return ((Float4Vector) vv).getMutator().setSafe(index, (float) fieldValue);
+ case INT:
+ return ((IntVector) vv).getMutator().setSafe(index, (int) fieldValue);
+ case LONG:
+ return ((BigIntVector) vv).getMutator().setSafe(index, (long) fieldValue);
+ case SHORT:
+ return ((SmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue);
+ case STRING:
+ int len = ((Text) fieldValue).getLength();
+ byte[] bytes = ((Text) fieldValue).getBytes();
+ return ((VarCharVector) vv).getMutator().setSafe(index, bytes, 0, len);
+ case TIMESTAMP:
+ throw new UnsupportedOperationException();
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type");
+ }
+ }
+
+ @Override
+ public int next() {
+ for (ValueVector vv : vectors) {
+ VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT);
+ }
+ try {
+ int recordCount = 0;
+ if (redoRecord != null) {
+ Object deSerializedValue = serde.deserialize((Writable) redoRecord);
+ for (int i = 0; i < columnNames.size(); i++) {
+ Object obj;
+ String columnName = columnNames.get(i);
+ if (primitiveCategories.get(i) == PrimitiveCategory.STRING) {
+ obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ } else {
+ obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ }
+ boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj);
+ if (!success) {
+ throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnName));
+ }
+ }
+ redoRecord = null;
+ recordCount++;
+ }
+ while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+ Object deSerializedValue = serde.deserialize((Writable) value);
+ for (int i = 0; i < columnNames.size(); i++) {
+ Object obj;
+ String columnName = columnNames.get(i);
+ if (primitiveCategories.get(i) == PrimitiveCategory.STRING) {
+ obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ } else {
+ obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName)));
+ }
+ boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj);
+ if (!success) {
+ redoRecord = value;
+ if (partition != null) populatePartitionVectors(recordCount);
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(recordCount);
+ }
+ if (partition != null) populatePartitionVectors(recordCount);
+ return recordCount;
+ }
+ }
+ recordCount++;
+ }
+ for (ValueVector v : vectors) {
+ v.getMutator().setValueCount(recordCount);
+ }
+ if (partition != null) populatePartitionVectors(recordCount);
+ return recordCount;
+ } catch (IOException | SerDeException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
new file mode 100644
index 000000000..bc2a16bc6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.annotation.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.util.*;
+
+@JsonTypeName("hive-scan")
+public class HiveScan extends AbstractGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class);
+
+ @JsonProperty("hive-table")
+ public HiveReadEntry hiveReadEntry;
+ @JsonIgnore
+ private Table table;
+ @JsonIgnore
+ private List<InputSplit> inputSplits = Lists.newArrayList();
+ @JsonIgnore
+ public HiveStorageEngine storageEngine;
+ @JsonProperty("storageengine")
+ public HiveStorageEngineConfig engineConfig;
+
+ @JsonIgnore
+ public List<Partition> partitions;
+ @JsonIgnore
+ private Collection<DrillbitEndpoint> endpoints;
+
+ @JsonProperty("columns")
+ public List<FieldReference> columns;
+
+ @JsonIgnore
+ List<List<InputSplit>> mappings;
+
+ @JsonIgnore
+ Map<InputSplit, Partition> partitionMap = new HashMap();
+
+ @JsonCreator
+ public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storageengine") HiveStorageEngineConfig config,
+ @JsonProperty("columns") List<FieldReference> columns,
+ @JacksonInject StorageEngineRegistry engineRegistry) throws ExecutionSetupException {
+ this.hiveReadEntry = hiveReadEntry;
+ this.table = hiveReadEntry.getTable();
+ this.engineConfig = config;
+ this.storageEngine = (HiveStorageEngine) engineRegistry.getEngine(config);
+ this.columns = columns;
+ this.partitions = hiveReadEntry.getPartitions();
+ getSplits();
+ endpoints = storageEngine.getContext().getBits();
+ }
+
+ public HiveScan(HiveReadEntry hiveReadEntry, HiveStorageEngine storageEngine, List<FieldReference> columns) throws ExecutionSetupException {
+ this.table = hiveReadEntry.getTable();
+ this.hiveReadEntry = hiveReadEntry;
+ this.columns = columns;
+ this.partitions = hiveReadEntry.getPartitions();
+ getSplits();
+ endpoints = storageEngine.getContext().getBits();
+ this.engineConfig = storageEngine.getConfig();
+ }
+
+ public List<FieldReference> getColumns() {
+ return columns;
+ }
+
+ private void getSplits() throws ExecutionSetupException {
+ try {
+ if (partitions == null || partitions.size() == 0) {
+ Properties properties = MetaStoreUtils.getTableMetadata(table);
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ InputFormat format = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(format.getClass());
+ Path path = new Path(table.getSd().getLocation());
+ FileInputFormat.addInputPath(job, path);
+ format = job.getInputFormat();
+ for (InputSplit split : format.getSplits(job, 1)) {
+ inputSplits.add(split);
+ }
+ for (InputSplit split : inputSplits) {
+ partitionMap.put(split, null);
+ }
+ } else {
+ for (Partition partition : partitions) {
+ Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+ JobConf job = new JobConf();
+ for (Object obj : properties.keySet()) {
+ job.set((String) obj, (String) properties.get(obj));
+ }
+ InputFormat format = (InputFormat) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
+ job.setInputFormat(format.getClass());
+ FileInputFormat.addInputPath(job, new Path(partition.getSd().getLocation()));
+ format = job.getInputFormat();
+ InputSplit[] splits = format.getSplits(job,1);
+ for (InputSplit split : splits) {
+ inputSplits.add(split);
+ partitionMap.put(split, partition);
+ }
+ }
+ }
+ } catch (ReflectiveOperationException | IOException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) {
+ mappings = Lists.newArrayList();
+ for (int i = 0; i < endpoints.size(); i++) {
+ mappings.add(new ArrayList<InputSplit>());
+ }
+ int count = endpoints.size();
+ for (int i = 0; i < inputSplits.size(); i++) {
+ mappings.get(i % count).add(inputSplits.get(i));
+ }
+ }
+
+ public static String serializeInputSplit(InputSplit split) throws IOException {
+ ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput();
+ split.write(byteArrayOutputStream);
+ String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
+ logger.debug("Encoded split string for split {} : {}", split, encoded);
+ return encoded;
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+ try {
+ List<InputSplit> splits = mappings.get(minorFragmentId);
+ List<Partition> parts = Lists.newArrayList();
+ List<String> encodedInputSplits = Lists.newArrayList();
+ List<String> splitTypes = Lists.newArrayList();
+ for (InputSplit split : splits) {
+ parts.add(partitionMap.get(split));
+ encodedInputSplits.add(serializeInputSplit(split));
+ splitTypes.add(split.getClass().getCanonicalName());
+ }
+ if (parts.contains(null)) parts = null;
+ return new HiveSubScan(table, parts, encodedInputSplits, splitTypes, columns);
+ } catch (IOException | ReflectiveOperationException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public int getMaxParallelizationWidth() {
+ return inputSplits.size();
+ }
+
+ @Override
+ public List<EndpointAffinity> getOperatorAffinity() {
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap();
+ for (DrillbitEndpoint endpoint : endpoints) {
+ endpointMap.put(endpoint.getAddress(), endpoint);
+ logger.debug("endpoing address: {}", endpoint.getAddress());
+ }
+ Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap();
+ try {
+ long totalSize = 0;
+ for (InputSplit split : inputSplits) {
+ totalSize += Math.max(1, split.getLength());
+ }
+ for (InputSplit split : inputSplits) {
+ float affinity = ((float) Math.max(1, split.getLength())) / totalSize;
+ for (String loc : split.getLocations()) {
+ logger.debug("split location: {}", loc);
+ DrillbitEndpoint endpoint = endpointMap.get(loc);
+ if (endpoint != null) {
+ if (affinityMap.containsKey(endpoint)) {
+ affinityMap.get(endpoint).addAffinity(affinity);
+ } else {
+ affinityMap.put(endpoint, new EndpointAffinity(endpoint, affinity));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ for (DrillbitEndpoint ep : affinityMap.keySet()) {
+ Preconditions.checkNotNull(ep);
+ }
+ for (EndpointAffinity a : affinityMap.values()) {
+ Preconditions.checkNotNull(a.getEndpoint());
+ }
+ return Lists.newArrayList(affinityMap.values());
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - this is wrong, need to populate correctly
+ return new Size(10,10);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ return new HiveScan(hiveReadEntry, storageEngine, columns);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
new file mode 100644
index 000000000..b155661ed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+import java.util.List;
+
+public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+ List<RecordReader> readers = Lists.newArrayList();
+ Table table = config.table;
+ List<InputSplit> splits = config.getInputSplits();
+ List<Partition> partitions = config.partitions;
+ if (partitions == null || partitions.size() == 0) {
+ if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
+ table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
+ config.getColumns() != null) {
+ for (InputSplit split : splits) {
+ readers.add(new HiveTextRecordReader(table, null, split, config.getColumns(), context));
+ }
+ } else {
+ for (InputSplit split : splits) {
+ readers.add(new HiveRecordReader(table, null, split, config.getColumns(), context));
+ }
+ }
+ } else {
+ int i = 0;
+ if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
+ table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
+ config.getColumns() != null) {
+ for (InputSplit split : splits) {
+ readers.add(new HiveTextRecordReader(table, partitions.get(i++), split, config.getColumns(), context));
+ }
+ } else {
+ for (InputSplit split : splits) {
+ readers.add(new HiveRecordReader(config.table, partitions.get(i++), split, config.getColumns(), context));
+ }
+ }
+ }
+ return new ScanBatch(context, readers.iterator());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
new file mode 100644
index 000000000..0f6f3bc4a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HiveStorageEngine extends AbstractStorageEngine {
+
+ private HiveStorageEngineConfig config;
+ private HiveConf hiveConf;
+ private HiveSchemaProvider schemaProvider;
+ static private DrillbitContext context;
+
+ public HiveStorageEngine(HiveStorageEngineConfig config, DrillbitContext context) throws ExecutionSetupException {
+ this.config = config;
+ this.context = context;
+ this.hiveConf = config.getHiveConf();
+ }
+
+ public HiveStorageEngineConfig getConfig() {
+ return config;
+ }
+
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public HiveScan getPhysicalScan(Scan scan) throws IOException {
+ HiveReadEntry hiveReadEntry = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
+ try {
+ List<Partition> partitions = getSchemaProvider().getPartitions(hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName());
+ return new HiveScan(hiveReadEntry, this, null);
+ } catch (ExecutionSetupException | TException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public HiveSchemaProvider getSchemaProvider() {
+ try {
+ if (schemaProvider == null) {
+ schemaProvider = new HiveSchemaProvider(config, context.getConfig());
+ }
+ return schemaProvider;
+ } catch (ExecutionSetupException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ List<String> getPartitions(String dbName, String tableName) throws TException {
+ List<Partition> partitions = getSchemaProvider().getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+ List<String> partitionLocations = Lists.newArrayList();
+ if (partitions == null) return null;
+ for (Partition part : partitions) {
+ partitionLocations.add(part.getSd().getLocation());
+ }
+ return partitionLocations;
+ }
+
+ public static class HiveEntry implements ReadEntry {
+
+ private Table table;
+
+ public HiveEntry(Table table) {
+ this.table = table;
+ }
+
+ public Table getTable() {
+ return table;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+ "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+ }
+
+ @Override
+ public Size getSize() {
+ throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " +
+ "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost.");
+ }
+ }
+
+ public static class HiveSchemaProvider implements SchemaProvider {
+
+ private HiveConf hiveConf;
+ private HiveMetaStoreClient metaClient;
+
+ public HiveSchemaProvider(HiveStorageEngineConfig config, DrillConfig dConfig) throws ExecutionSetupException {
+ hiveConf = config.getHiveConf();
+ }
+
+ public HiveMetaStoreClient getMetaClient() throws MetaException {
+ if (metaClient == null) {
+ metaClient = new HiveMetaStoreClient(hiveConf);
+ }
+ return metaClient;
+ }
+
+ public Table getTable(String dbName, String tableName) throws TException {
+ HiveMetaStoreClient mClient = getMetaClient();
+ try {
+ return mClient.getTable(dbName, tableName);
+ }catch (NoSuchObjectException e) {
+ logger.error("Database: {} table: {} not found", dbName, tableName);
+ throw new RuntimeException(e);
+ } catch (TException e) {
+ mClient.reconnect();
+ return mClient.getTable(dbName, tableName);
+ }
+ }
+
+ List<Partition> getPartitions(String dbName, String tableName) throws TException {
+ HiveMetaStoreClient mClient = getMetaClient();
+ List<Partition> partitions;
+ try {
+ partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+ } catch (TException e) {
+ mClient.reconnect();
+ partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE);
+ }
+ return partitions;
+ }
+
+ @Override
+ public HiveReadEntry getSelectionBaseOnName(String name) {
+ String[] dbNameTableName = name.split("\\.");
+ String dbName;
+ String t;
+ if (dbNameTableName.length > 1) {
+ dbName = dbNameTableName[0];
+ t = dbNameTableName[1];
+ } else {
+ dbName = "default";
+ t = name;
+ }
+
+ try {
+ Table table = getTable(dbName, t);
+ List<Partition> partitions = getPartitions(dbName, t);
+ List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList();
+ for(Partition part : partitions) {
+ hivePartitions.add(new HiveTable.HivePartition(part));
+ }
+ if (hivePartitions.size() == 0) hivePartitions = null;
+ return new HiveReadEntry(new HiveTable(table), hivePartitions);
+ } catch (NoSuchObjectException e) {
+ throw new DrillRuntimeException(e);
+ } catch (TException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
index 0a2c5ded0..91fec3b20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.hive;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.logical.StorageEngineConfigBase;
@@ -27,9 +28,12 @@ import java.util.Map;
@JsonTypeName("hive")
public class HiveStorageEngineConfig extends StorageEngineConfigBase {
- private Map<String, String> configProps;
+ @JsonProperty
+ public Map<String, String> configProps;
+ @JsonIgnore
private HiveConf hiveConf;
+ @JsonIgnore
public HiveConf getHiveConf() {
if (hiveConf == null) {
hiveConf = new HiveConf();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
new file mode 100644
index 000000000..8ff7c823c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteStreams;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public class HiveSubScan extends AbstractBase implements SubScan {
+
+ @JsonProperty("splits")
+ public List<String> encodedSplits;
+ @JsonProperty("hive-table")
+ public Table table;
+ @JsonProperty("partitions")
+ public List<Partition> partitions;
+ @JsonIgnore
+ private List<InputSplit> inputSplits = Lists.newArrayList();
+ @JsonProperty("splitClass")
+ public List<String> splitClasses;
+
+ @JsonProperty("columns")
+ public List<FieldReference> columns;
+
+ @JsonCreator
+ public HiveSubScan(@JsonProperty("hive-table") Table table,
+ @JsonProperty("partition") List<Partition> partitions,
+ @JsonProperty("splits") List<String> encodedSplits,
+ @JsonProperty("splitClasses") List<String> splitClasses,
+ @JsonProperty("columns") List<FieldReference> columns) throws IOException, ReflectiveOperationException {
+ this.table = table;
+ this.partitions = partitions;
+ this.encodedSplits = encodedSplits;
+ this.splitClasses = splitClasses;
+ this.columns = columns;
+
+ for (int i = 0; i < encodedSplits.size(); i++) {
+ inputSplits.add(deserializeInputSplit(encodedSplits.get(i), splitClasses.get(i)));
+ }
+ }
+
+ public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{
+ InputSplit split;
+ if (Class.forName(className) == FileSplit.class) {
+ split = new FileSplit((Path) null, 0, 0, (String[])null);
+ } else {
+ split = (InputSplit) Class.forName(className).getConstructor().newInstance();
+ }
+ ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64));
+ split.readFields(byteArrayDataInput);
+ return split;
+ }
+
+ public List<FieldReference> getColumns() {
+ return columns;
+ }
+
+ public List<InputSplit> getInputSplits() {
+ return inputSplits;
+ }
+
+ @Override
+ public OperatorCost getCost() {
+ return new OperatorCost(1, 2, 1, 1);
+ }
+
+ @Override
+ public Size getSize() {
+ // TODO - this is wrong, need to populate correctly
+ return new Size(10,10);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitSubScan(this, value);
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ try {
+ return new HiveSubScan(table, partitions, encodedSplits, splitClasses, columns);
+ } catch (IOException | ReflectiveOperationException e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ @Override
+ public Iterator<PhysicalOperator> iterator() {
+ return Iterators.emptyIterator();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
new file mode 100644
index 000000000..385880408
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.hadoop.hive.metastore.api.*;
+
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("table")
+public class HiveTable {
+
+ @JsonIgnore
+ private Table table;
+
+ @JsonProperty
+ public String tableName;
+ @JsonProperty
+ public String dbName;
+ @JsonProperty
+ public String owner;
+ @JsonProperty
+ public int createTime;
+ @JsonProperty
+ public int lastAccessTime;
+ @JsonProperty
+ public int retention;
+ @JsonProperty
+ public StorageDescriptorWrapper sd;
+ @JsonProperty
+ public List<FieldSchemaWrapper> partitionKeys;
+ @JsonProperty
+ public Map<String,String> parameters;
+ @JsonProperty
+ public String viewOriginalText;
+ @JsonProperty
+ public String viewExpandedText;
+ @JsonProperty
+ public String tableType;
+
+ @JsonCreator
+ public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime,
+ @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd,
+ @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters,
+ @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType
+ ) {
+ this.tableName = tableName;
+ this.dbName = dbName;
+ this.owner = owner;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.retention = retention;
+ this.sd = sd;
+ this.partitionKeys = partitionKeys;
+ this.parameters = parameters;
+ this.viewOriginalText = viewOriginalText;
+ this.viewExpandedText = viewExpandedText;
+ this.tableType = tableType;
+
+ List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper w : partitionKeys) partitionKeysUnwrapped.add(w.getFieldSchema());
+ StorageDescriptor sdUnwrapped = sd.getSd();
+ this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped,
+ parameters, viewOriginalText, viewExpandedText, tableType);
+ }
+
+ public HiveTable(Table table) {
+ if (table == null) return;
+ this.table = table;
+ this.tableName = table.getTableName();
+ this.dbName = table.getDbName();
+ this.owner = table.getOwner();
+ this.createTime = table.getCreateTime();
+ this.lastAccessTime = table.getLastAccessTime();
+ this.retention = table.getRetention();
+ this.sd = new StorageDescriptorWrapper(table.getSd());
+ this.partitionKeys = Lists.newArrayList();
+ for (FieldSchema f : table.getPartitionKeys()) this.partitionKeys.add(new FieldSchemaWrapper(f));
+ this.parameters = table.getParameters();
+ this.viewOriginalText = table.getViewOriginalText();
+ this.viewExpandedText = table.getViewExpandedText();
+ this.tableType = table.getTableType();
+ }
+
+ @JsonIgnore
+ public Table getTable() {
+ return table;
+ }
+
+ public static class HivePartition {
+
+ @JsonIgnore
+ private Partition partition;
+
+ @JsonProperty
+ public List<String> values;
+ @JsonProperty
+ public String tableName;
+ @JsonProperty
+ public String dbName;
+ @JsonProperty
+ public int createTime;
+ @JsonProperty
+ public int lastAccessTime;
+ @JsonProperty
+ public StorageDescriptorWrapper sd;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime,
+ @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd,
+ @JsonProperty("parameters") Map<String, String> parameters
+ ) {
+ this.values = values;
+ this.tableName = tableName;
+ this.dbName = dbName;
+ this.createTime = createTime;
+ this.lastAccessTime = lastAccessTime;
+ this.sd = sd;
+ this.parameters = parameters;
+
+ StorageDescriptor sdUnwrapped = sd.getSd();
+ this.partition = new org.apache.hadoop.hive.metastore.api.Partition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters);
+ }
+
+ public HivePartition(Partition partition) {
+ if (partition == null) return;
+ this.partition = partition;
+ this.values = partition.getValues();
+ this.tableName = partition.getTableName();
+ this.dbName = partition.getDbName();
+ this.createTime = partition.getCreateTime();
+ this.lastAccessTime = partition.getLastAccessTime();
+ this.sd = new StorageDescriptorWrapper(partition.getSd());
+ this.parameters = partition.getParameters();
+ }
+
+ @JsonIgnore
+ public Partition getPartition() {
+ return partition;
+ }
+ }
+
+ public static class StorageDescriptorWrapper {
+ @JsonIgnore
+ private StorageDescriptor sd;
+ @JsonProperty
+ public List<FieldSchemaWrapper> cols;
+ @JsonProperty
+ public String location;
+ @JsonProperty
+ public String inputFormat;
+ @JsonProperty
+ public String outputFormat;
+ @JsonProperty
+ public boolean compressed;
+ @JsonProperty
+ public int numBuckets;
+ @JsonProperty
+ public SerDeInfoWrapper serDeInfo;
+// @JsonProperty
+// public List<String> bucketCols;
+ @JsonProperty
+ public List<OrderWrapper> sortCols;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public StorageDescriptorWrapper(@JsonProperty("cols") List<FieldSchemaWrapper> cols, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat,
+ @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets,
+ @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo, @JsonProperty("sortCols") List<OrderWrapper> sortCols,
+ @JsonProperty("parameters") Map<String,String> parameters) {
+ this.cols = cols;
+ this.location = location;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
+ this.compressed = compressed;
+ this.numBuckets = numBuckets;
+ this.serDeInfo = serDeInfo;
+// this.bucketCols = bucketCols;
+ this.sortCols = sortCols;
+ this.parameters = parameters;
+ List<FieldSchema> colsUnwrapped = Lists.newArrayList();
+ for (FieldSchemaWrapper w: cols) colsUnwrapped.add(w.getFieldSchema());
+ SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo();
+ List<Order> sortColsUnwrapped = Lists.newArrayList();
+ for (OrderWrapper w : sortCols) sortColsUnwrapped.add(w.getOrder());
+// this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
+// bucketCols, sortColsUnwrapped, parameters);
+ this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped,
+ null, sortColsUnwrapped, parameters);
+ }
+
+ public StorageDescriptorWrapper(StorageDescriptor sd) {
+ this.sd = sd;
+ this.cols = Lists.newArrayList();
+ for (FieldSchema f : sd.getCols()) this.cols.add(new FieldSchemaWrapper(f));
+ this.location = sd.getLocation();
+ this.inputFormat = sd.getInputFormat();
+ this.outputFormat = sd.getOutputFormat();
+ this.compressed = sd.isCompressed();
+ this.numBuckets = sd.getNumBuckets();
+ this.serDeInfo = new SerDeInfoWrapper(sd.getSerdeInfo());
+// this.bucketCols = sd.getBucketCols();
+ this.sortCols = Lists.newArrayList();
+ for (Order o : sd.getSortCols()) this.sortCols.add(new OrderWrapper(o));
+ this.parameters = sd.getParameters();
+ }
+
+ @JsonIgnore
+ public StorageDescriptor getSd() {
+ return sd;
+ }
+
+ }
+
+ public static class SerDeInfoWrapper {
+ @JsonIgnore
+ private SerDeInfo serDeInfo;
+ @JsonProperty
+ public String name;
+ @JsonProperty
+ public String serializationLib;
+ @JsonProperty
+ public Map<String,String> parameters;
+
+ @JsonCreator
+ public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) {
+ this.name = name;
+ this.serializationLib = serializationLib;
+ this.parameters = parameters;
+ this.serDeInfo = new SerDeInfo(name, serializationLib, parameters);
+ }
+
+ public SerDeInfoWrapper(SerDeInfo serDeInfo) {
+ this.serDeInfo = serDeInfo;
+ this.name = serDeInfo.getName();
+ this.serializationLib = serDeInfo.getSerializationLib();
+ this.parameters = serDeInfo.getParameters();
+ }
+
+ @JsonIgnore
+ public SerDeInfo getSerDeInfo() {
+ return serDeInfo;
+ }
+ }
+
+ public static class FieldSchemaWrapper {
+ @JsonIgnore
+ private FieldSchema fieldSchema;
+ @JsonProperty
+ public String name;
+ @JsonProperty
+ public String type;
+ @JsonProperty
+ public String comment;
+
+ @JsonCreator
+ public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) {
+ this.name = name;
+ this.type = type;
+ this.comment = comment;
+ this.fieldSchema = new FieldSchema(name, type, comment);
+ }
+
+ public FieldSchemaWrapper(FieldSchema fieldSchema) {
+ this.fieldSchema = fieldSchema;
+ this.name = fieldSchema.getName();
+ this.type = fieldSchema.getType();
+ this.comment = fieldSchema.getComment();
+ }
+
+ @JsonIgnore
+ public FieldSchema getFieldSchema() {
+ return fieldSchema;
+ }
+ }
+
+ public static class OrderWrapper {
+ @JsonIgnore
+ private Order ord;
+ @JsonProperty
+ public String col;
+ @JsonProperty
+ public int order;
+
+ @JsonCreator
+ public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) {
+ this.col = col;
+ this.order = order;
+ }
+
+ public OrderWrapper(Order ord) {
+ this.ord = ord;
+ this.col = ord.getCol();
+ this.order = ord.getOrder();
+ }
+
+ @JsonIgnore
+ public Order getOrder() {
+ return ord;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
new file mode 100644
index 000000000..1e47684cd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HiveTextRecordReader extends HiveRecordReader {
+
+ public final byte delimiter;
+ public final List<Integer> columnIds;
+ private final int numCols;
+
+ public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException {
+ super(table, partition, inputSplit, columns, context);
+ String d = table.getSd().getSerdeInfo().getParameters().get("field.delim");
+ if (d != null) {
+ delimiter = d.getBytes()[0];
+ } else {
+ delimiter = (byte) 1;
+ }
+ assert delimiter > 0;
+ List<Integer> ids = Lists.newArrayList();
+ for (int i = 0; i < tableColumns.size(); i++) {
+ if (columnNames.contains(tableColumns.get(i))) {
+ ids.add(i);
+ }
+ }
+ columnIds = ids;
+ numCols = tableColumns.size();
+ }
+
+ public boolean setValue(PrimitiveObjectInspector.PrimitiveCategory pCat, ValueVector vv, int index, byte[] bytes, int start) {
+ switch(pCat) {
+ case BINARY:
+ throw new UnsupportedOperationException();
+ case BOOLEAN:
+ throw new UnsupportedOperationException();
+ case BYTE:
+ throw new UnsupportedOperationException();
+ case DECIMAL:
+ throw new UnsupportedOperationException();
+ case DOUBLE:
+ throw new UnsupportedOperationException();
+ case FLOAT:
+ throw new UnsupportedOperationException();
+ case INT: {
+ int value = 0;
+ byte b;
+ for (int i = start; (b = bytes[i]) != delimiter; i++) {
+ value = (value * 10) + b - 48;
+ }
+ ((IntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors
+ return true;
+ }
+ case LONG: {
+ long value = 0;
+ byte b;
+ for (int i = start; (b = bytes[i]) != delimiter; i++) {
+ value = (value * 10) + b - 48;
+ }
+ ((BigIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors
+ return true;
+ }
+ case SHORT:
+ throw new UnsupportedOperationException();
+ case STRING: {
+ int end = start;
+ for (int i = start; i < bytes.length; i++) {
+ if (bytes[i] == delimiter) {
+ end = i;
+ break;
+ }
+ end = bytes.length;
+ }
+ return ((VarCharVector) vv).getMutator().setSafe(index, bytes, start, end - start);
+ }
+ case TIMESTAMP:
+ throw new UnsupportedOperationException();
+
+ default:
+ throw new UnsupportedOperationException("Could not determine type");
+ }
+ }
+
+
+ @Override
+ public int next() {
+ for (ValueVector vv : vectors) {
+ VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT);
+ }
+ try {
+ int recordCount = 0;
+ if (redoRecord != null) {
+ int length = ((Text) value).getLength();
+ byte[] bytes = ((Text) value).getBytes();
+ int[] delimPositions = new int[numCols];
+ delimPositions[0] = -1;
+ int p = 0;
+ for (int i = 0; i < length; i++) {
+ if (bytes[i] == delimiter) {
+ delimPositions[p++] = i;
+ }
+ }
+ for (int id : columnIds) {
+ boolean success = setValue(primitiveCategories.get(id), vectors.get(id), recordCount, bytes, delimPositions[id]);
+ if (!success) {
+ throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnNames.get(id)));
+ }
+
+ }
+ redoRecord = null;
+ }
+ while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+ int length = ((Text) value).getLength();
+ byte[] bytes = ((Text) value).getBytes();
+ int[] delimPositions = new int[numCols + 1];
+ delimPositions[0] = -1;
+ int p = 1;
+ for (int i = 0; i < length; i++) {
+ if (bytes[i] == delimiter) {
+ delimPositions[p++] = i;
+ }
+ }
+ for (int i = 0; i < columnIds.size(); i++) {
+ int id = columnIds.get(i);
+ boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, bytes, delimPositions[id] + 1);
+ if (!success) {
+ redoRecord = value;
+ if (partition != null) populatePartitionVectors(recordCount);
+ return recordCount;
+ }
+ }
+ recordCount++;
+ }
+ if (partition != null) populatePartitionVectors(recordCount);
+ return recordCount;
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
index 209961dd6..86be49e32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.store.ClassPathFileSystem;
import org.apache.drill.exec.store.SchemaProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import com.beust.jcommander.internal.Lists;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index aa68752b7..aa68752b7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index c801163ac..4dd08c132 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -82,5 +82,6 @@ drill.exec: {
delete: false,
size: 100000000
}
- }
+ },
+ cache.hazel.subnets: ["*.*.*.*"]
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java
index 38ec007a8..71e6283e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java
@@ -15,25 +15,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.hive;
+package org.apache.drill.exec;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.drill.exec.client.QuerySubmitter;
+import org.junit.Ignore;
+import org.junit.Test;
-import com.beust.jcommander.internal.Lists;
+import java.io.IOException;
-public class HiveSchemaProvider implements SchemaProvider{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaProvider.class);
+/**
+ * Created with IntelliJ IDEA.
+ * User: sphillips
+ * Date: 1/24/14
+ * Time: 3:46 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public class TestPlan {
- final HiveStorageEngineConfig configuration;
+ String location = "/Users/sphillips/hive-lineitem-orderkey";
+ String type = "physical";
+ String zkQuorum = null;
+ boolean local = true;
+ int bits = 1;
- public HiveSchemaProvider(HiveStorageEngineConfig configuration, DrillConfig config){
- this.configuration = configuration;
- }
- @Override
- public Object getSelectionBaseOnName(String tableName) {
- HiveReadEntry re = new HiveReadEntry(configuration.getHiveConf(), tableName);
- return Lists.newArrayList(re);
+ @Test
+ @Ignore
+ public void testSubmitPlan() throws Exception {
+ QuerySubmitter submitter = new QuerySubmitter();
+ submitter.submitQuery(location, type, zkQuorum, local, bits);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java
new file mode 100644
index 000000000..c6edc2000
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sphillips
+ * Date: 1/23/14
+ * Time: 5:22 AM
+ * To change this template use File | Settings | File Templates.
+ */
+public class TestHiveScan extends PopUnitTestBase {
+ @Ignore
+ @Test
+ public void twoBitTwoExchangeTwoEntryRun() throws Exception {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+ Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) {
+
+ bit1.run();
+ bit2.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/hive/test.json"),
+ Charsets.UTF_8));
+ int count = 0;
+ for(QueryResultBatch b : results) {
+ if (b.getHeader().getRowCount() != 0)
+ count += b.getHeader().getRowCount();
+ }
+ assertEquals(100, count);
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/resources/hive/test.json b/exec/java-exec/src/test/resources/hive/test.json
new file mode 100644
index 000000000..a039d9e39
--- /dev/null
+++ b/exec/java-exec/src/test/resources/hive/test.json
@@ -0,0 +1,75 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"hive-scan",
+ storageengine: { type: "hive"},
+ hive-table: {
+ "tableName" : "nation",
+ "dbName" : "default",
+ "owner" : "root",
+ "createTime" : 1386876893,
+ "lastAccessTime" : 0,
+ "retention" : 0,
+ "sd" : {
+ "cols" : [ {
+ "name" : "n_nationkey",
+ "type" : "bigint",
+ "comment" : null
+ }, {
+ "name" : "n_name",
+ "type" : "string",
+ "comment" : null
+ }, {
+ "name" : "n_regionkey",
+ "type" : "bigint",
+ "comment" : null
+ }, {
+ "name" : "n_comment",
+ "type" : "string",
+ "comment" : null
+ } ],
+ "location" : "maprfs:/user/hive/warehouse/nation",
+ "inputFormat" : "org.apache.hadoop.mapred.TextInputFormat",
+ "outputFormat" : "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
+ "compressed" : false,
+ "numBuckets" : 0,
+ "serDeInfo" : {
+ "name" : null,
+ "serializationLib" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
+ "parameters" : {
+ "serialization.format" : "|",
+ "field.delim" : "|"
+ }
+ },
+ "sortCols" : [ ],
+ "parameters" : { }
+ },
+ "partitionKeys" : [ ],
+ "parameters" : {
+ "numPartitions" : "0",
+ "numFiles" : "1",
+ "transient_lastDdlTime" : "1386877487",
+ "totalSize" : "2224",
+ "numRows" : "0",
+ "rawDataSize" : "0"
+ },
+ "viewOriginalText" : null,
+ "viewExpandedText" : null,
+ "tableType" : "MANAGED_TABLE"
+ }
+ },
+ {
+ @id: 2,
+ child: 1,
+ pop: "screen"
+ }
+ ]
+} \ No newline at end of file