diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-01-23 18:32:03 -0800 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-03-03 23:21:50 -0800 |
commit | cdf46fd36fdfc2e3029a6b2e077330c665e43c2e (patch) | |
tree | d69cb592765321c67c1c6571e89c840772e2c7f8 /exec/java-exec/src | |
parent | a9a7ea84c99d8a9efcccc7d9a870121a26212b49 (diff) |
DRILL-357: Hive Storage Engine phase 2 - hive record reader
Diffstat (limited to 'exec/java-exec/src')
20 files changed, 2079 insertions, 29 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 57927a71d..b059d89fa 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -260,7 +260,43 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V assert index >= 0; int currentOffset = offsetVector.getAccessor().get(index); offsetVector.getMutator().set(index + 1, currentOffset + bytes.length); - data.setBytes(currentOffset, bytes); + data.setBytes(currentOffset, bytes, 0, bytes.length); + } + + public boolean setSafe(int index, byte[] bytes) { + assert index >= 0; + int currentOffset = offsetVector.getAccessor().get(index); + if (data.capacity() < currentOffset + bytes.length) return false; + offsetVector.getMutator().set(index + 1, currentOffset + bytes.length); + data.setBytes(currentOffset, bytes, 0, bytes.length); + return true; + } + + /** + * Set the variable length element at the specified index to the supplied byte array. + * + * @param index position of the bit to set + * @param bytes array of bytes to write + * @param start start index of bytes to write + * @param length length of bytes to write + */ + public void set(int index, byte[] bytes, int start, int length) { + assert index >= 0; + int currentOffset = offsetVector.getAccessor().get(index); + offsetVector.getMutator().set(index + 1, currentOffset + length); + data.setBytes(currentOffset, bytes, start, length); + } + + public boolean setSafe(int index, byte[] bytes, int start, int length) { + assert index >= 0; + + int currentOffset = offsetVector.getAccessor().get(index); + + if (data.capacity() < currentOffset + length) return false; + + offsetVector.getMutator().set(index + 1, currentOffset + length); + data.setBytes(currentOffset, bytes, start, length); + return true; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java index e9946dfb8..688e68005 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java @@ -28,7 +28,7 @@ public class TemplateClassDefinition<T>{ private final Class<T> iface; private final Class<?> template; private final SignatureHolder signature; - private final AtomicLong classNumber = new AtomicLong(0); + private static final AtomicLong classNumber = new AtomicLong(0); public <X extends T> TemplateClassDefinition(Class<T> iface, Class<X> template) { super(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 8c1487ca0..94fcac51c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -72,6 +72,7 @@ public class Wrapper { public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) { Preconditions.checkState(!endpointsAssigned); + Preconditions.checkNotNull(endpoint); EndpointAffinity ea = endpointAffinity.get(endpoint); if (ea == null) { ea = new EndpointAffinity(endpoint); @@ -149,6 +150,7 @@ public class Wrapper { int start = ThreadLocalRandom.current().nextInt(div); // round robin with random start. for (int i = start; i < start + width; i++) { + Preconditions.checkNotNull(all.get(i % div)); endpoints.add(all.get(i % div)); } } else { @@ -156,6 +158,7 @@ public class Wrapper { Collections.sort(values); values = Lists.reverse(values); for (int i = 0; i < width; i++) { + Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint()); endpoints.add(values.get(i%values.size()).getEndpoint()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java new file mode 100644 index 000000000..880970643 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveInputReader.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.commons.codec.binary.Base64; +import org.apache.drill.common.util.DataOutputOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.hbase.HBaseSerDe; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.*; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.util.*; + +public class HiveInputReader { + public static void main(String args[]) throws Exception { +/* + String[] columnNames = {"n_nationkey", "n_name", "n_regionkey", "n_comment"}; + String[] columnTypes = {"bigint", "string", "bigint", "string"}; + + List<FieldSchema> cols = Lists.newArrayList(); + + for (int i = 0; i < columnNames.length; i++) { + cols.add(new FieldSchema(columnNames[i], columnTypes[i], null)); + } + String location = "file:///tmp/nation_s"; + String inputFormat = TextInputFormat.class.getCanonicalName(); + String serdeLib = LazySimpleSerDe.class.getCanonicalName(); +// String inputFormat = HiveHBaseTableInputFormat.class.getCanonicalName(); +// String serdeLib = HBaseSerDe.class.getCanonicalName(); + Map<String, String> serdeParams = new HashMap(); +// serdeParams.put("serialization.format", "1"); +// serdeParams.put("hbase.columns.mapping", ":key,f:name,f:regionkey,f:comment"); + serdeParams.put("serialization.format", "|"); + serdeParams.put("field.delim", "|"); + + + Map<String, String> tableParams = new HashMap(); + tableParams.put("hbase.table.name", "nation"); + SerDeInfo serDeInfo = new SerDeInfo(null, serdeLib, serdeParams); + StorageDescriptor storageDescriptor = new StorageDescriptor(cols, location, inputFormat, null, false, -1, serDeInfo, null, null, null); + Table table = new Table("table", "default", "sphillips", 0, 0, 0, storageDescriptor, new ArrayList<FieldSchema>(), tableParams, null, null, "MANAGED_TABLE"); + Properties properties = MetaStoreUtils.getTableMetadata(table); + */ + + HiveConf conf = new HiveConf(); + conf.set("hive.metastore.uris", "thrift://10.10.31.51:9083"); + HiveMetaStoreClient client = new HiveMetaStoreClient(conf); + Table table = client.getTable("default", "nation"); + Properties properties = MetaStoreUtils.getTableMetadata(table); + + Path path = new Path(table.getSd().getLocation()); + JobConf job = new JobConf(); + for (Object obj : properties.keySet()) { + job.set((String) obj, (String) properties.get(obj)); + } +// job.set("hbase.zookeeper.quorum", "10.10.31.51"); +// job.set("hbase.zookeeper.property.clientPort", "5181"); + InputFormat f = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance(); + job.setInputFormat(f.getClass()); + FileInputFormat.addInputPath(job, path); + InputFormat format = job.getInputFormat(); + SerDe serde = (SerDe) Class.forName(table.getSd().getSerdeInfo().getSerializationLib()).getConstructor().newInstance(); + serde.initialize(job, properties); + ObjectInspector inspector = serde.getObjectInspector(); + ObjectInspector.Category cat = inspector.getCategory(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(inspector); + List<String> columns = null; + List<TypeInfo> colTypes = null; + List<ObjectInspector> fieldObjectInspectors = Lists.newArrayList(); + + switch(typeInfo.getCategory()) { + case STRUCT: + columns = ((StructTypeInfo) typeInfo).getAllStructFieldNames(); + colTypes = ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos(); + for (int i = 0; i < columns.size(); i++) { + System.out.print(columns.get(i)); + System.out.print(" "); + System.out.print(colTypes.get(i)); + } + System.out.println(""); + for (StructField field : ((StructObjectInspector)inspector).getAllStructFieldRefs()) { + fieldObjectInspectors.add(field.getFieldObjectInspector()); + } + } + + for (InputSplit split : format.getSplits(job, 1)) { + String encoded = serializeInputSplit(split); + System.out.println(encoded); + InputSplit newSplit = deserializeInputSplit(encoded, split.getClass().getCanonicalName()); + System.out.print("Length: " + newSplit.getLength() + " "); + System.out.print("Locations: "); + for (String loc : newSplit.getLocations()) System.out.print(loc + " " ); + System.out.println(); + } + + for (InputSplit split : format.getSplits(job, 1)) { + RecordReader reader = format.getRecordReader(split, job, Reporter.NULL); + Object key = reader.createKey(); + Object value = reader.createValue(); + int count = 0; + while (reader.next(key, value)) { + List<Object> values = ((StructObjectInspector) inspector).getStructFieldsDataAsList(serde.deserialize((Writable) value)); + StructObjectInspector sInsp = (StructObjectInspector) inspector; + Object obj = sInsp.getStructFieldData(serde.deserialize((Writable) value) , sInsp.getStructFieldRef("n_name")); + System.out.println(obj); + /* + for (Object obj : values) { + PrimitiveObjectInspector.PrimitiveCategory pCat = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveCategory(); + Object pObj = ((PrimitiveObjectInspector)fieldObjectInspectors.get(count)).getPrimitiveJavaObject(obj); + System.out.print(pObj + " "); + } + */ + System.out.println(""); + } + } + } + + public static String serializeInputSplit(InputSplit split) throws IOException { + ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput(); + split.write(byteArrayOutputStream); + return Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); + } + + public static InputSplit deserializeInputSplit(String base64, String className) throws Exception { + InputSplit split; + if (Class.forName(className) == FileSplit.class) { + split = new FileSplit((Path) null, 0, 0, (String[])null); + } else { + split = (InputSplit) Class.forName(className).getConstructor().newInstance(); + } + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64)); + split.readFields(byteArrayDataInput); + return split; + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java index 41a4d3d2b..6211e2186 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java @@ -17,19 +17,47 @@ */ package org.apache.drill.exec.store.hive; +import com.beust.jcommander.internal.Lists; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.ReadEntry; import org.apache.drill.exec.physical.base.Size; -import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; + +import java.util.List; public class HiveReadEntry implements ReadEntry { - private final HiveConf conf; - private final String table; - private Size size; - public HiveReadEntry(HiveConf conf, String table) { - this.conf = conf; + @JsonProperty("table") + public HiveTable table; + @JsonProperty("partitions") + public List<HiveTable.HivePartition> partitions; + + @JsonIgnore + private List<Partition> partitionsUnwrapped = Lists.newArrayList(); + + @JsonCreator + public HiveReadEntry(@JsonProperty("table") HiveTable table, @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) { this.table = table; + this.partitions = partitions; + if (partitions != null) { + for(HiveTable.HivePartition part : partitions) { + partitionsUnwrapped.add(part.getPartition()); + } + } + } + + @JsonIgnore + public Table getTable() { + return table.getTable(); + } + + @JsonIgnore + public List<Partition> getPartitions() { + return partitionsUnwrapped; } @Override @@ -40,11 +68,10 @@ public class HiveReadEntry implements ReadEntry { @Override public Size getSize() { - if (size != null) { - // TODO: contact the metastore and find the size of the data in table - size = new Size(1, 1); - } + // TODO: contact the metastore and find the size of the data in table + Size size = new Size(1, 1); return size; } } + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java new file mode 100644 index 000000000..ef7266c52 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntryOld.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.ReadEntry; +import org.apache.drill.exec.physical.base.Size; +import org.apache.hadoop.hive.conf.HiveConf; + +public class HiveReadEntryOld implements ReadEntry { + private final HiveConf conf; + private final String table; + private Size size; + + public HiveReadEntryOld(HiveConf conf, String table) { + this.conf = conf; + this.table = table; + } + + @Override + public OperatorCost getCost() { + // TODO: need to come up with way to calculate the cost for Hive tables + return new OperatorCost(1, 1, 2, 2); + } + + @Override + public Size getSize() { + if (size != null) { + // TODO: contact the metastore and find the size of the data in table + size = new Size(1, 1); + } + + return size; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java new file mode 100644 index 000000000..0a31a1252 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -0,0 +1,473 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class HiveRecordReader implements RecordReader { + + protected Table table; + protected Partition partition; + protected InputSplit inputSplit; + protected FragmentContext context; + protected List<FieldReference> columns; + protected List<String> columnNames; + protected List<String> partitionNames = Lists.newArrayList(); + protected List<String> selectedPartitionNames = Lists.newArrayList(); + protected List<String> selectedPartitionTypes = Lists.newArrayList(); + protected List<String> tableColumns; + protected SerDe serde; + protected StructObjectInspector sInspector; + protected List<PrimitiveObjectInspector> fieldInspectors = Lists.newArrayList(); + protected List<PrimitiveCategory> primitiveCategories = Lists.newArrayList(); + protected Object key, value; + protected org.apache.hadoop.mapred.RecordReader reader; + protected List<ValueVector> vectors = Lists.newArrayList(); + protected List<ValueVector> pVectors = Lists.newArrayList(); + protected Object redoRecord; + List<Object> partitionValues = Lists.newArrayList(); + + protected static final int TARGET_RECORD_COUNT = 4000; + + public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException { + this.table = table; + this.partition = partition; + this.inputSplit = inputSplit; + this.context = context; + this.columns = columns; + init(); + } + + private void init() throws ExecutionSetupException { + Properties properties; + JobConf job = new JobConf(); + if (partition != null) { + properties = MetaStoreUtils.getPartitionMetadata(partition, table); + } else { + properties = MetaStoreUtils.getTableMetadata(table); + } + for (Object obj : properties.keySet()) { + job.set((String) obj, (String) properties.get(obj)); + } + InputFormat format; + String sLib = (partition == null) ? table.getSd().getSerdeInfo().getSerializationLib() : partition.getSd().getSerdeInfo().getSerializationLib(); + String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat(); + try { + format = (InputFormat) Class.forName(inputFormatName).getConstructor().newInstance(); + Class c = Class.forName(sLib); + serde = (SerDe) c.getConstructor().newInstance(); + serde.initialize(job, properties); + } catch (ReflectiveOperationException | SerDeException e) { + throw new ExecutionSetupException("Unable to instantiate InputFormat", e); + } + job.setInputFormat(format.getClass()); + + if (partition != null) { + List<FieldSchema> partitionKeys = table.getPartitionKeys(); + for (FieldSchema field : partitionKeys) { + partitionNames.add(field.getName()); + } + } + + try { + ObjectInspector oi = serde.getObjectInspector(); + if (oi.getCategory() != ObjectInspector.Category.STRUCT) { + throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory())); + } + sInspector = (StructObjectInspector) oi; + StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(sInspector); + if (columns == null) { + columnNames = sTypeInfo.getAllStructFieldNames(); + tableColumns = columnNames; + } else { + tableColumns = sTypeInfo.getAllStructFieldNames(); + List<Integer> columnIds = Lists.newArrayList(); + columnNames = Lists.newArrayList(); + for (FieldReference field : columns) { + String columnName = field.getPath().toString(); + if (!tableColumns.contains(columnName)) { + if (partition != null && partitionNames.contains(columnName)) { + selectedPartitionNames.add(columnName); + } else { + throw new ExecutionSetupException(String.format("Column %s does not exist", columnName)); + } + } else { + columnIds.add(tableColumns.indexOf(columnName)); + columnNames.add(columnName); + } + } + ColumnProjectionUtils.appendReadColumnIDs(job, columnIds); + ColumnProjectionUtils.appendReadColumnNames(job, columnNames); + } + for (String columnName : columnNames) { + ObjectInspector poi = sInspector.getStructFieldRef(columnName).getFieldObjectInspector(); + if(poi.getCategory() != ObjectInspector.Category.PRIMITIVE) { + throw new UnsupportedOperationException(String.format("%s type not supported", poi.getCategory())); + } + PrimitiveObjectInspector pInspector = (PrimitiveObjectInspector) poi; + fieldInspectors.add(pInspector); + primitiveCategories.add(pInspector.getPrimitiveCategory()); + } + + if (columns == null) { + selectedPartitionNames = partitionNames; + } + + if (partition != null) { + for (int i = 0; i < table.getPartitionKeys().size(); i++) { + FieldSchema field = table.getPartitionKeys().get(i); + if (selectedPartitionNames.contains(field.getName())) { + selectedPartitionTypes.add(field.getType()); + partitionValues.add(convertPartitionType(field.getType(), partition.getValues().get(i))); + } + } + } + } catch (SerDeException e) { + throw new ExecutionSetupException(e); + } + try { + reader = format.getRecordReader(inputSplit, job, Reporter.NULL); + } catch (IOException e) { + throw new ExecutionSetupException("Failed to get Recordreader", e); + } + key = reader.createKey(); + value = reader.createValue(); + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + output.removeAllFields(); + try { + for (int i = 0; i < columnNames.size(); i++) { + PrimitiveCategory pCat = primitiveCategories.get(i); + MaterializedField field = MaterializedField.create(new SchemaPath(columnNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(pCat)); + ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator()); + vectors.add(vv); + output.addField(vv); + } + for (int i = 0; i < selectedPartitionNames.size(); i++) { + String type = selectedPartitionTypes.get(i); + MaterializedField field = MaterializedField.create(new SchemaPath(selectedPartitionNames.get(i), ExpressionPosition.UNKNOWN), getMajorType(type)); + ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator()); + pVectors.add(vv); + output.addField(vv); + } + output.setNewSchema(); + } catch(SchemaChangeException e) { + throw new ExecutionSetupException(e); + } + } + + protected void populatePartitionVectors(int recordCount) { + for (int i = 0; i < pVectors.size(); i++) { + int size = 50; + ValueVector vector = pVectors.get(i); + Object val = partitionValues.get(i); + if (selectedPartitionTypes.get(i).equals("string") || selectedPartitionTypes.get(i).equals("binary")) { + size = ((byte[]) partitionValues.get(i)).length; + } + VectorAllocator.getAllocator(vector, size).alloc(recordCount); + switch(selectedPartitionTypes.get(i)) { + case "boolean": { + BitVector v = (BitVector) vector; + Boolean value = (Boolean) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value ? 1 : 0); + } + break; + } + case "tinyint": { + TinyIntVector v = (TinyIntVector) vector; + byte value = (byte) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + case "double": { + Float8Vector v = (Float8Vector) vector; + double value = (double) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + case "float": { + Float4Vector v = (Float4Vector) vector; + float value = (float) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + case "int": { + IntVector v = (IntVector) vector; + int value = (int) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + case "bigint": { + BigIntVector v = (BigIntVector) vector; + long value = (long) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + case "smallint": { + SmallIntVector v = (SmallIntVector) vector; + short value = (short) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + case "string": { + VarCharVector v = (VarCharVector) vector; + byte[] value = (byte[]) val; + for (int j = 0; j < recordCount; j++) { + v.getMutator().set(j, value); + } + break; + } + default: + throw new UnsupportedOperationException("Could not determine type: " + selectedPartitionTypes.get(i)); + } + vector.getMutator().setValueCount(recordCount); + } + } + + private Object convertPartitionType(String type, String value) { + switch (type) { + case "boolean": + return Boolean.parseBoolean(value); + case "tinyint": + return Byte.parseByte(value); + case "double": + return Double.parseDouble(value); + case "float": + return Float.parseFloat(value); + case "int": + return Integer.parseInt(value); + case "bigint": + return Long.parseLong(value); + case "smallint": + return Short.parseShort(value); + case "string": + return value.getBytes(); + default: + throw new UnsupportedOperationException("Could not determine type: " + type); + } + } + + public static TypeProtos.MajorType getMajorType(String type) { + switch(type) { + case "binary": + return Types.required(TypeProtos.MinorType.VARBINARY); + case "boolean": + return Types.required(TypeProtos.MinorType.BIT); + case "tinyint": + return Types.required(TypeProtos.MinorType.TINYINT); + case "decimal": + return Types.required(TypeProtos.MinorType.DECIMAL16); + case "double": + return Types.required(TypeProtos.MinorType.FLOAT8); + case "float": + return Types.required(TypeProtos.MinorType.FLOAT4); + case "int": + return Types.required(TypeProtos.MinorType.INT); + case "bigint": + return Types.required(TypeProtos.MinorType.BIGINT); + case "smallint": + return Types.required(TypeProtos.MinorType.SMALLINT); + case "string": + return Types.required(TypeProtos.MinorType.VARCHAR); + case "varchar": + + default: + throw new UnsupportedOperationException("Could not determine type: " + type); + } + } + + public static TypeProtos.MajorType getMajorType(PrimitiveCategory pCat) { + switch(pCat) { + case BINARY: + return Types.required(TypeProtos.MinorType.VARBINARY); + case BOOLEAN: + return Types.required(TypeProtos.MinorType.BIT); + case BYTE: + return Types.required(TypeProtos.MinorType.TINYINT); + case DECIMAL: + return Types.required(TypeProtos.MinorType.DECIMAL16); + case DOUBLE: + return Types.required(TypeProtos.MinorType.FLOAT8); + case FLOAT: + return Types.required(TypeProtos.MinorType.FLOAT4); + case INT: + return Types.required(TypeProtos.MinorType.INT); + case LONG: + return Types.required(TypeProtos.MinorType.BIGINT); + case SHORT: + return Types.required(TypeProtos.MinorType.SMALLINT); + case STRING: + return Types.required(TypeProtos.MinorType.VARCHAR); + case TIMESTAMP: + + default: + throw new UnsupportedOperationException("Could not determine type"); + } + } + + public boolean setValue(PrimitiveCategory pCat, ValueVector vv, int index, Object fieldValue) { + switch(pCat) { + case BINARY: + ((VarBinaryVector) vv).getMutator().setSafe(index, (byte[]) fieldValue); + case BOOLEAN: + boolean isSet = (boolean) fieldValue; + return ((BitVector) vv).getMutator().setSafe(index, isSet ? 1 : 0 ); + case BYTE: + return ((TinyIntVector) vv).getMutator().setSafe(index, (byte) fieldValue); + case DECIMAL: + throw new UnsupportedOperationException(); + case DOUBLE: + return ((Float8Vector) vv).getMutator().setSafe(index, (double) fieldValue); + case FLOAT: + return ((Float4Vector) vv).getMutator().setSafe(index, (float) fieldValue); + case INT: + return ((IntVector) vv).getMutator().setSafe(index, (int) fieldValue); + case LONG: + return ((BigIntVector) vv).getMutator().setSafe(index, (long) fieldValue); + case SHORT: + return ((SmallIntVector) vv).getMutator().setSafe(index, (short) fieldValue); + case STRING: + int len = ((Text) fieldValue).getLength(); + byte[] bytes = ((Text) fieldValue).getBytes(); + return ((VarCharVector) vv).getMutator().setSafe(index, bytes, 0, len); + case TIMESTAMP: + throw new UnsupportedOperationException(); + + default: + throw new UnsupportedOperationException("Could not determine type"); + } + } + + @Override + public int next() { + for (ValueVector vv : vectors) { + VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT); + } + try { + int recordCount = 0; + if (redoRecord != null) { + Object deSerializedValue = serde.deserialize((Writable) redoRecord); + for (int i = 0; i < columnNames.size(); i++) { + Object obj; + String columnName = columnNames.get(i); + if (primitiveCategories.get(i) == PrimitiveCategory.STRING) { + obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); + } else { + obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); + } + boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); + if (!success) { + throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnName)); + } + } + redoRecord = null; + recordCount++; + } + while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { + Object deSerializedValue = serde.deserialize((Writable) value); + for (int i = 0; i < columnNames.size(); i++) { + Object obj; + String columnName = columnNames.get(i); + if (primitiveCategories.get(i) == PrimitiveCategory.STRING) { + obj = fieldInspectors.get(i).getPrimitiveWritableObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); + } else { + obj = fieldInspectors.get(i).getPrimitiveJavaObject(sInspector.getStructFieldData(deSerializedValue, sInspector.getStructFieldRef(columnName))); + } + boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, obj); + if (!success) { + redoRecord = value; + if (partition != null) populatePartitionVectors(recordCount); + for (ValueVector v : vectors) { + v.getMutator().setValueCount(recordCount); + } + if (partition != null) populatePartitionVectors(recordCount); + return recordCount; + } + } + recordCount++; + } + for (ValueVector v : vectors) { + v.getMutator().setValueCount(recordCount); + } + if (partition != null) populatePartitionVectors(recordCount); + return recordCount; + } catch (IOException | SerDeException e) { + throw new DrillRuntimeException(e); + } + } + + @Override + public void cleanup() { + //To change body of implemented methods use File | Settings | File Templates. + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java new file mode 100644 index 000000000..bc2a16bc6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.fasterxml.jackson.annotation.*; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.commons.codec.binary.Base64; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.StorageEngineRegistry; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; + +import java.io.IOException; +import java.util.*; + +@JsonTypeName("hive-scan") +public class HiveScan extends AbstractGroupScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveScan.class); + + @JsonProperty("hive-table") + public HiveReadEntry hiveReadEntry; + @JsonIgnore + private Table table; + @JsonIgnore + private List<InputSplit> inputSplits = Lists.newArrayList(); + @JsonIgnore + public HiveStorageEngine storageEngine; + @JsonProperty("storageengine") + public HiveStorageEngineConfig engineConfig; + + @JsonIgnore + public List<Partition> partitions; + @JsonIgnore + private Collection<DrillbitEndpoint> endpoints; + + @JsonProperty("columns") + public List<FieldReference> columns; + + @JsonIgnore + List<List<InputSplit>> mappings; + + @JsonIgnore + Map<InputSplit, Partition> partitionMap = new HashMap(); + + @JsonCreator + public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storageengine") HiveStorageEngineConfig config, + @JsonProperty("columns") List<FieldReference> columns, + @JacksonInject StorageEngineRegistry engineRegistry) throws ExecutionSetupException { + this.hiveReadEntry = hiveReadEntry; + this.table = hiveReadEntry.getTable(); + this.engineConfig = config; + this.storageEngine = (HiveStorageEngine) engineRegistry.getEngine(config); + this.columns = columns; + this.partitions = hiveReadEntry.getPartitions(); + getSplits(); + endpoints = storageEngine.getContext().getBits(); + } + + public HiveScan(HiveReadEntry hiveReadEntry, HiveStorageEngine storageEngine, List<FieldReference> columns) throws ExecutionSetupException { + this.table = hiveReadEntry.getTable(); + this.hiveReadEntry = hiveReadEntry; + this.columns = columns; + this.partitions = hiveReadEntry.getPartitions(); + getSplits(); + endpoints = storageEngine.getContext().getBits(); + this.engineConfig = storageEngine.getConfig(); + } + + public List<FieldReference> getColumns() { + return columns; + } + + private void getSplits() throws ExecutionSetupException { + try { + if (partitions == null || partitions.size() == 0) { + Properties properties = MetaStoreUtils.getTableMetadata(table); + JobConf job = new JobConf(); + for (Object obj : properties.keySet()) { + job.set((String) obj, (String) properties.get(obj)); + } + InputFormat format = (InputFormat) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance(); + job.setInputFormat(format.getClass()); + Path path = new Path(table.getSd().getLocation()); + FileInputFormat.addInputPath(job, path); + format = job.getInputFormat(); + for (InputSplit split : format.getSplits(job, 1)) { + inputSplits.add(split); + } + for (InputSplit split : inputSplits) { + partitionMap.put(split, null); + } + } else { + for (Partition partition : partitions) { + Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table); + JobConf job = new JobConf(); + for (Object obj : properties.keySet()) { + job.set((String) obj, (String) properties.get(obj)); + } + InputFormat format = (InputFormat) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance(); + job.setInputFormat(format.getClass()); + FileInputFormat.addInputPath(job, new Path(partition.getSd().getLocation())); + format = job.getInputFormat(); + InputSplit[] splits = format.getSplits(job,1); + for (InputSplit split : splits) { + inputSplits.add(split); + partitionMap.put(split, partition); + } + } + } + } catch (ReflectiveOperationException | IOException e) { + throw new ExecutionSetupException(e); + } + } + + @Override + public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> endpoints) { + mappings = Lists.newArrayList(); + for (int i = 0; i < endpoints.size(); i++) { + mappings.add(new ArrayList<InputSplit>()); + } + int count = endpoints.size(); + for (int i = 0; i < inputSplits.size(); i++) { + mappings.get(i % count).add(inputSplits.get(i)); + } + } + + public static String serializeInputSplit(InputSplit split) throws IOException { + ByteArrayDataOutput byteArrayOutputStream = ByteStreams.newDataOutput(); + split.write(byteArrayOutputStream); + String encoded = Base64.encodeBase64String(byteArrayOutputStream.toByteArray()); + logger.debug("Encoded split string for split {} : {}", split, encoded); + return encoded; + } + + @Override + public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException { + try { + List<InputSplit> splits = mappings.get(minorFragmentId); + List<Partition> parts = Lists.newArrayList(); + List<String> encodedInputSplits = Lists.newArrayList(); + List<String> splitTypes = Lists.newArrayList(); + for (InputSplit split : splits) { + parts.add(partitionMap.get(split)); + encodedInputSplits.add(serializeInputSplit(split)); + splitTypes.add(split.getClass().getCanonicalName()); + } + if (parts.contains(null)) parts = null; + return new HiveSubScan(table, parts, encodedInputSplits, splitTypes, columns); + } catch (IOException | ReflectiveOperationException e) { + throw new ExecutionSetupException(e); + } + } + + @Override + public int getMaxParallelizationWidth() { + return inputSplits.size(); + } + + @Override + public List<EndpointAffinity> getOperatorAffinity() { + Map<String, DrillbitEndpoint> endpointMap = new HashMap(); + for (DrillbitEndpoint endpoint : endpoints) { + endpointMap.put(endpoint.getAddress(), endpoint); + logger.debug("endpoing address: {}", endpoint.getAddress()); + } + Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap(); + try { + long totalSize = 0; + for (InputSplit split : inputSplits) { + totalSize += Math.max(1, split.getLength()); + } + for (InputSplit split : inputSplits) { + float affinity = ((float) Math.max(1, split.getLength())) / totalSize; + for (String loc : split.getLocations()) { + logger.debug("split location: {}", loc); + DrillbitEndpoint endpoint = endpointMap.get(loc); + if (endpoint != null) { + if (affinityMap.containsKey(endpoint)) { + affinityMap.get(endpoint).addAffinity(affinity); + } else { + affinityMap.put(endpoint, new EndpointAffinity(endpoint, affinity)); + } + } + } + } + } catch (IOException e) { + throw new DrillRuntimeException(e); + } + for (DrillbitEndpoint ep : affinityMap.keySet()) { + Preconditions.checkNotNull(ep); + } + for (EndpointAffinity a : affinityMap.values()) { + Preconditions.checkNotNull(a.getEndpoint()); + } + return Lists.newArrayList(affinityMap.values()); + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(1, 2, 1, 1); + } + + @Override + public Size getSize() { + // TODO - this is wrong, need to populate correctly + return new Size(10,10); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + return new HiveScan(hiveReadEntry, storageEngine, columns); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java new file mode 100644 index 000000000..b155661ed --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.beust.jcommander.internal.Lists; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.TextInputFormat; + +import java.util.List; + +public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { + + @Override + public RecordBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children) throws ExecutionSetupException { + List<RecordReader> readers = Lists.newArrayList(); + Table table = config.table; + List<InputSplit> splits = config.getInputSplits(); + List<Partition> partitions = config.partitions; + if (partitions == null || partitions.size() == 0) { + if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && + table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && + config.getColumns() != null) { + for (InputSplit split : splits) { + readers.add(new HiveTextRecordReader(table, null, split, config.getColumns(), context)); + } + } else { + for (InputSplit split : splits) { + readers.add(new HiveRecordReader(table, null, split, config.getColumns(), context)); + } + } + } else { + int i = 0; + if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) && + table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) && + config.getColumns() != null) { + for (InputSplit split : splits) { + readers.add(new HiveTextRecordReader(table, partitions.get(i++), split, config.getColumns(), context)); + } + } else { + for (InputSplit split : splits) { + readers.add(new HiveRecordReader(config.table, partitions.get(i++), split, config.getColumns(), context)); + } + } + } + return new ScanBatch(context, readers.iterator()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java new file mode 100644 index 000000000..0f6f3bc4a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngine.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.ReadEntry; +import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStorageEngine; +import org.apache.drill.exec.store.SchemaProvider; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.List; + +public class HiveStorageEngine extends AbstractStorageEngine { + + private HiveStorageEngineConfig config; + private HiveConf hiveConf; + private HiveSchemaProvider schemaProvider; + static private DrillbitContext context; + + public HiveStorageEngine(HiveStorageEngineConfig config, DrillbitContext context) throws ExecutionSetupException { + this.config = config; + this.context = context; + this.hiveConf = config.getHiveConf(); + } + + public HiveStorageEngineConfig getConfig() { + return config; + } + + public DrillbitContext getContext() { + return context; + } + + @Override + public HiveScan getPhysicalScan(Scan scan) throws IOException { + HiveReadEntry hiveReadEntry = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){}); + try { + List<Partition> partitions = getSchemaProvider().getPartitions(hiveReadEntry.getTable().getDbName(), hiveReadEntry.getTable().getTableName()); + return new HiveScan(hiveReadEntry, this, null); + } catch (ExecutionSetupException | TException e) { + throw new DrillRuntimeException(e); + } + } + + @Override + public HiveSchemaProvider getSchemaProvider() { + try { + if (schemaProvider == null) { + schemaProvider = new HiveSchemaProvider(config, context.getConfig()); + } + return schemaProvider; + } catch (ExecutionSetupException e) { + throw new DrillRuntimeException(e); + } + } + + List<String> getPartitions(String dbName, String tableName) throws TException { + List<Partition> partitions = getSchemaProvider().getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE); + List<String> partitionLocations = Lists.newArrayList(); + if (partitions == null) return null; + for (Partition part : partitions) { + partitionLocations.add(part.getSd().getLocation()); + } + return partitionLocations; + } + + public static class HiveEntry implements ReadEntry { + + private Table table; + + public HiveEntry(Table table) { + this.table = table; + } + + public Table getTable() { + return table; + } + + @Override + public OperatorCost getCost() { + throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " + + "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost."); + } + + @Override + public Size getSize() { + throw new UnsupportedOperationException(this.getClass().getCanonicalName() + " is only for extracting path data from " + + "selections inside a scan node from a logical plan, it cannot be used in an executing plan and has no cost."); + } + } + + public static class HiveSchemaProvider implements SchemaProvider { + + private HiveConf hiveConf; + private HiveMetaStoreClient metaClient; + + public HiveSchemaProvider(HiveStorageEngineConfig config, DrillConfig dConfig) throws ExecutionSetupException { + hiveConf = config.getHiveConf(); + } + + public HiveMetaStoreClient getMetaClient() throws MetaException { + if (metaClient == null) { + metaClient = new HiveMetaStoreClient(hiveConf); + } + return metaClient; + } + + public Table getTable(String dbName, String tableName) throws TException { + HiveMetaStoreClient mClient = getMetaClient(); + try { + return mClient.getTable(dbName, tableName); + }catch (NoSuchObjectException e) { + logger.error("Database: {} table: {} not found", dbName, tableName); + throw new RuntimeException(e); + } catch (TException e) { + mClient.reconnect(); + return mClient.getTable(dbName, tableName); + } + } + + List<Partition> getPartitions(String dbName, String tableName) throws TException { + HiveMetaStoreClient mClient = getMetaClient(); + List<Partition> partitions; + try { + partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE); + } catch (TException e) { + mClient.reconnect(); + partitions = getMetaClient().listPartitions(dbName, tableName, Short.MAX_VALUE); + } + return partitions; + } + + @Override + public HiveReadEntry getSelectionBaseOnName(String name) { + String[] dbNameTableName = name.split("\\."); + String dbName; + String t; + if (dbNameTableName.length > 1) { + dbName = dbNameTableName[0]; + t = dbNameTableName[1]; + } else { + dbName = "default"; + t = name; + } + + try { + Table table = getTable(dbName, t); + List<Partition> partitions = getPartitions(dbName, t); + List<HiveTable.HivePartition> hivePartitions = Lists.newArrayList(); + for(Partition part : partitions) { + hivePartitions.add(new HiveTable.HivePartition(part)); + } + if (hivePartitions.size() == 0) hivePartitions = null; + return new HiveReadEntry(new HiveTable(table), hivePartitions); + } catch (NoSuchObjectException e) { + throw new DrillRuntimeException(e); + } catch (TException e) { + throw new DrillRuntimeException(e); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java index 0a2c5ded0..91fec3b20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStorageEngineConfig.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.hive; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.logical.StorageEngineConfigBase; @@ -27,9 +28,12 @@ import java.util.Map; @JsonTypeName("hive") public class HiveStorageEngineConfig extends StorageEngineConfigBase { - private Map<String, String> configProps; + @JsonProperty + public Map<String, String> configProps; + @JsonIgnore private HiveConf hiveConf; + @JsonIgnore public HiveConf getHiveConf() { if (hiveConf == null) { hiveConf = new HiveConf(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java new file mode 100644 index 000000000..8ff7c823c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.beust.jcommander.internal.Lists; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Iterators; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteStreams; +import org.apache.commons.codec.binary.Base64; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class HiveSubScan extends AbstractBase implements SubScan { + + @JsonProperty("splits") + public List<String> encodedSplits; + @JsonProperty("hive-table") + public Table table; + @JsonProperty("partitions") + public List<Partition> partitions; + @JsonIgnore + private List<InputSplit> inputSplits = Lists.newArrayList(); + @JsonProperty("splitClass") + public List<String> splitClasses; + + @JsonProperty("columns") + public List<FieldReference> columns; + + @JsonCreator + public HiveSubScan(@JsonProperty("hive-table") Table table, + @JsonProperty("partition") List<Partition> partitions, + @JsonProperty("splits") List<String> encodedSplits, + @JsonProperty("splitClasses") List<String> splitClasses, + @JsonProperty("columns") List<FieldReference> columns) throws IOException, ReflectiveOperationException { + this.table = table; + this.partitions = partitions; + this.encodedSplits = encodedSplits; + this.splitClasses = splitClasses; + this.columns = columns; + + for (int i = 0; i < encodedSplits.size(); i++) { + inputSplits.add(deserializeInputSplit(encodedSplits.get(i), splitClasses.get(i))); + } + } + + public static InputSplit deserializeInputSplit(String base64, String className) throws IOException, ReflectiveOperationException{ + InputSplit split; + if (Class.forName(className) == FileSplit.class) { + split = new FileSplit((Path) null, 0, 0, (String[])null); + } else { + split = (InputSplit) Class.forName(className).getConstructor().newInstance(); + } + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(Base64.decodeBase64(base64)); + split.readFields(byteArrayDataInput); + return split; + } + + public List<FieldReference> getColumns() { + return columns; + } + + public List<InputSplit> getInputSplits() { + return inputSplits; + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(1, 2, 1, 1); + } + + @Override + public Size getSize() { + // TODO - this is wrong, need to populate correctly + return new Size(10,10); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + try { + return new HiveSubScan(table, partitions, encodedSplits, splitClasses, columns); + } catch (IOException | ReflectiveOperationException e) { + throw new ExecutionSetupException(e); + } + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return Iterators.emptyIterator(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java new file mode 100644 index 000000000..385880408 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTable.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.beust.jcommander.internal.Lists; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.hadoop.hive.metastore.api.*; + +import java.util.List; +import java.util.Map; + +@JsonTypeName("table") +public class HiveTable { + + @JsonIgnore + private Table table; + + @JsonProperty + public String tableName; + @JsonProperty + public String dbName; + @JsonProperty + public String owner; + @JsonProperty + public int createTime; + @JsonProperty + public int lastAccessTime; + @JsonProperty + public int retention; + @JsonProperty + public StorageDescriptorWrapper sd; + @JsonProperty + public List<FieldSchemaWrapper> partitionKeys; + @JsonProperty + public Map<String,String> parameters; + @JsonProperty + public String viewOriginalText; + @JsonProperty + public String viewExpandedText; + @JsonProperty + public String tableType; + + @JsonCreator + public HiveTable(@JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("owner") String owner, @JsonProperty("createTime") int createTime, + @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("retention") int retention, @JsonProperty("sd") StorageDescriptorWrapper sd, + @JsonProperty("partitionKeys") List<FieldSchemaWrapper> partitionKeys, @JsonProperty("parameters") Map<String, String> parameters, + @JsonProperty("viewOriginalText") String viewOriginalText, @JsonProperty("viewExpandedText") String viewExpandedText, @JsonProperty("tableType") String tableType + ) { + this.tableName = tableName; + this.dbName = dbName; + this.owner = owner; + this.createTime = createTime; + this.lastAccessTime = lastAccessTime; + this.retention = retention; + this.sd = sd; + this.partitionKeys = partitionKeys; + this.parameters = parameters; + this.viewOriginalText = viewOriginalText; + this.viewExpandedText = viewExpandedText; + this.tableType = tableType; + + List<FieldSchema> partitionKeysUnwrapped = Lists.newArrayList(); + for (FieldSchemaWrapper w : partitionKeys) partitionKeysUnwrapped.add(w.getFieldSchema()); + StorageDescriptor sdUnwrapped = sd.getSd(); + this.table = new Table(tableName, dbName, owner, createTime, lastAccessTime, retention, sdUnwrapped, partitionKeysUnwrapped, + parameters, viewOriginalText, viewExpandedText, tableType); + } + + public HiveTable(Table table) { + if (table == null) return; + this.table = table; + this.tableName = table.getTableName(); + this.dbName = table.getDbName(); + this.owner = table.getOwner(); + this.createTime = table.getCreateTime(); + this.lastAccessTime = table.getLastAccessTime(); + this.retention = table.getRetention(); + this.sd = new StorageDescriptorWrapper(table.getSd()); + this.partitionKeys = Lists.newArrayList(); + for (FieldSchema f : table.getPartitionKeys()) this.partitionKeys.add(new FieldSchemaWrapper(f)); + this.parameters = table.getParameters(); + this.viewOriginalText = table.getViewOriginalText(); + this.viewExpandedText = table.getViewExpandedText(); + this.tableType = table.getTableType(); + } + + @JsonIgnore + public Table getTable() { + return table; + } + + public static class HivePartition { + + @JsonIgnore + private Partition partition; + + @JsonProperty + public List<String> values; + @JsonProperty + public String tableName; + @JsonProperty + public String dbName; + @JsonProperty + public int createTime; + @JsonProperty + public int lastAccessTime; + @JsonProperty + public StorageDescriptorWrapper sd; + @JsonProperty + public Map<String,String> parameters; + + @JsonCreator + public HivePartition(@JsonProperty("values") List<String> values, @JsonProperty("tableName") String tableName, @JsonProperty("dbName") String dbName, @JsonProperty("createTime") int createTime, + @JsonProperty("lastAccessTime") int lastAccessTime, @JsonProperty("sd") StorageDescriptorWrapper sd, + @JsonProperty("parameters") Map<String, String> parameters + ) { + this.values = values; + this.tableName = tableName; + this.dbName = dbName; + this.createTime = createTime; + this.lastAccessTime = lastAccessTime; + this.sd = sd; + this.parameters = parameters; + + StorageDescriptor sdUnwrapped = sd.getSd(); + this.partition = new org.apache.hadoop.hive.metastore.api.Partition(values, tableName, dbName, createTime, lastAccessTime, sdUnwrapped, parameters); + } + + public HivePartition(Partition partition) { + if (partition == null) return; + this.partition = partition; + this.values = partition.getValues(); + this.tableName = partition.getTableName(); + this.dbName = partition.getDbName(); + this.createTime = partition.getCreateTime(); + this.lastAccessTime = partition.getLastAccessTime(); + this.sd = new StorageDescriptorWrapper(partition.getSd()); + this.parameters = partition.getParameters(); + } + + @JsonIgnore + public Partition getPartition() { + return partition; + } + } + + public static class StorageDescriptorWrapper { + @JsonIgnore + private StorageDescriptor sd; + @JsonProperty + public List<FieldSchemaWrapper> cols; + @JsonProperty + public String location; + @JsonProperty + public String inputFormat; + @JsonProperty + public String outputFormat; + @JsonProperty + public boolean compressed; + @JsonProperty + public int numBuckets; + @JsonProperty + public SerDeInfoWrapper serDeInfo; +// @JsonProperty +// public List<String> bucketCols; + @JsonProperty + public List<OrderWrapper> sortCols; + @JsonProperty + public Map<String,String> parameters; + + @JsonCreator + public StorageDescriptorWrapper(@JsonProperty("cols") List<FieldSchemaWrapper> cols, @JsonProperty("location") String location, @JsonProperty("inputFormat") String inputFormat, + @JsonProperty("outputFormat") String outputFormat, @JsonProperty("compressed") boolean compressed, @JsonProperty("numBuckets") int numBuckets, + @JsonProperty("serDeInfo") SerDeInfoWrapper serDeInfo, @JsonProperty("sortCols") List<OrderWrapper> sortCols, + @JsonProperty("parameters") Map<String,String> parameters) { + this.cols = cols; + this.location = location; + this.inputFormat = inputFormat; + this.outputFormat = outputFormat; + this.compressed = compressed; + this.numBuckets = numBuckets; + this.serDeInfo = serDeInfo; +// this.bucketCols = bucketCols; + this.sortCols = sortCols; + this.parameters = parameters; + List<FieldSchema> colsUnwrapped = Lists.newArrayList(); + for (FieldSchemaWrapper w: cols) colsUnwrapped.add(w.getFieldSchema()); + SerDeInfo serDeInfoUnwrapped = serDeInfo.getSerDeInfo(); + List<Order> sortColsUnwrapped = Lists.newArrayList(); + for (OrderWrapper w : sortCols) sortColsUnwrapped.add(w.getOrder()); +// this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped, +// bucketCols, sortColsUnwrapped, parameters); + this.sd = new StorageDescriptor(colsUnwrapped, location, inputFormat, outputFormat, compressed, numBuckets, serDeInfoUnwrapped, + null, sortColsUnwrapped, parameters); + } + + public StorageDescriptorWrapper(StorageDescriptor sd) { + this.sd = sd; + this.cols = Lists.newArrayList(); + for (FieldSchema f : sd.getCols()) this.cols.add(new FieldSchemaWrapper(f)); + this.location = sd.getLocation(); + this.inputFormat = sd.getInputFormat(); + this.outputFormat = sd.getOutputFormat(); + this.compressed = sd.isCompressed(); + this.numBuckets = sd.getNumBuckets(); + this.serDeInfo = new SerDeInfoWrapper(sd.getSerdeInfo()); +// this.bucketCols = sd.getBucketCols(); + this.sortCols = Lists.newArrayList(); + for (Order o : sd.getSortCols()) this.sortCols.add(new OrderWrapper(o)); + this.parameters = sd.getParameters(); + } + + @JsonIgnore + public StorageDescriptor getSd() { + return sd; + } + + } + + public static class SerDeInfoWrapper { + @JsonIgnore + private SerDeInfo serDeInfo; + @JsonProperty + public String name; + @JsonProperty + public String serializationLib; + @JsonProperty + public Map<String,String> parameters; + + @JsonCreator + public SerDeInfoWrapper(@JsonProperty("name") String name, @JsonProperty("serializationLib") String serializationLib, @JsonProperty("parameters") Map<String, String> parameters) { + this.name = name; + this.serializationLib = serializationLib; + this.parameters = parameters; + this.serDeInfo = new SerDeInfo(name, serializationLib, parameters); + } + + public SerDeInfoWrapper(SerDeInfo serDeInfo) { + this.serDeInfo = serDeInfo; + this.name = serDeInfo.getName(); + this.serializationLib = serDeInfo.getSerializationLib(); + this.parameters = serDeInfo.getParameters(); + } + + @JsonIgnore + public SerDeInfo getSerDeInfo() { + return serDeInfo; + } + } + + public static class FieldSchemaWrapper { + @JsonIgnore + private FieldSchema fieldSchema; + @JsonProperty + public String name; + @JsonProperty + public String type; + @JsonProperty + public String comment; + + @JsonCreator + public FieldSchemaWrapper(@JsonProperty("name") String name, @JsonProperty("type") String type, @JsonProperty("comment") String comment) { + this.name = name; + this.type = type; + this.comment = comment; + this.fieldSchema = new FieldSchema(name, type, comment); + } + + public FieldSchemaWrapper(FieldSchema fieldSchema) { + this.fieldSchema = fieldSchema; + this.name = fieldSchema.getName(); + this.type = fieldSchema.getType(); + this.comment = fieldSchema.getComment(); + } + + @JsonIgnore + public FieldSchema getFieldSchema() { + return fieldSchema; + } + } + + public static class OrderWrapper { + @JsonIgnore + private Order ord; + @JsonProperty + public String col; + @JsonProperty + public int order; + + @JsonCreator + public OrderWrapper(@JsonProperty("col") String col, @JsonProperty("order") int order) { + this.col = col; + this.order = order; + } + + public OrderWrapper(Order ord) { + this.ord = ord; + this.col = ord.getCol(); + this.order = ord.getOrder(); + } + + @JsonIgnore + public Order getOrder() { + return ord; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java new file mode 100644 index 000000000..1e47684cd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputSplit; + +import java.io.IOException; +import java.util.List; + +public class HiveTextRecordReader extends HiveRecordReader { + + public final byte delimiter; + public final List<Integer> columnIds; + private final int numCols; + + public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException { + super(table, partition, inputSplit, columns, context); + String d = table.getSd().getSerdeInfo().getParameters().get("field.delim"); + if (d != null) { + delimiter = d.getBytes()[0]; + } else { + delimiter = (byte) 1; + } + assert delimiter > 0; + List<Integer> ids = Lists.newArrayList(); + for (int i = 0; i < tableColumns.size(); i++) { + if (columnNames.contains(tableColumns.get(i))) { + ids.add(i); + } + } + columnIds = ids; + numCols = tableColumns.size(); + } + + public boolean setValue(PrimitiveObjectInspector.PrimitiveCategory pCat, ValueVector vv, int index, byte[] bytes, int start) { + switch(pCat) { + case BINARY: + throw new UnsupportedOperationException(); + case BOOLEAN: + throw new UnsupportedOperationException(); + case BYTE: + throw new UnsupportedOperationException(); + case DECIMAL: + throw new UnsupportedOperationException(); + case DOUBLE: + throw new UnsupportedOperationException(); + case FLOAT: + throw new UnsupportedOperationException(); + case INT: { + int value = 0; + byte b; + for (int i = start; (b = bytes[i]) != delimiter; i++) { + value = (value * 10) + b - 48; + } + ((IntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors + return true; + } + case LONG: { + long value = 0; + byte b; + for (int i = start; (b = bytes[i]) != delimiter; i++) { + value = (value * 10) + b - 48; + } + ((BigIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors + return true; + } + case SHORT: + throw new UnsupportedOperationException(); + case STRING: { + int end = start; + for (int i = start; i < bytes.length; i++) { + if (bytes[i] == delimiter) { + end = i; + break; + } + end = bytes.length; + } + return ((VarCharVector) vv).getMutator().setSafe(index, bytes, start, end - start); + } + case TIMESTAMP: + throw new UnsupportedOperationException(); + + default: + throw new UnsupportedOperationException("Could not determine type"); + } + } + + + @Override + public int next() { + for (ValueVector vv : vectors) { + VectorAllocator.getAllocator(vv, 50).alloc(TARGET_RECORD_COUNT); + } + try { + int recordCount = 0; + if (redoRecord != null) { + int length = ((Text) value).getLength(); + byte[] bytes = ((Text) value).getBytes(); + int[] delimPositions = new int[numCols]; + delimPositions[0] = -1; + int p = 0; + for (int i = 0; i < length; i++) { + if (bytes[i] == delimiter) { + delimPositions[p++] = i; + } + } + for (int id : columnIds) { + boolean success = setValue(primitiveCategories.get(id), vectors.get(id), recordCount, bytes, delimPositions[id]); + if (!success) { + throw new DrillRuntimeException(String.format("Failed to write value for column %s", columnNames.get(id))); + } + + } + redoRecord = null; + } + while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { + int length = ((Text) value).getLength(); + byte[] bytes = ((Text) value).getBytes(); + int[] delimPositions = new int[numCols + 1]; + delimPositions[0] = -1; + int p = 1; + for (int i = 0; i < length; i++) { + if (bytes[i] == delimiter) { + delimPositions[p++] = i; + } + } + for (int i = 0; i < columnIds.size(); i++) { + int id = columnIds.get(i); + boolean success = setValue(primitiveCategories.get(i), vectors.get(i), recordCount, bytes, delimPositions[id] + 1); + if (!success) { + redoRecord = value; + if (partition != null) populatePartitionVectors(recordCount); + return recordCount; + } + } + recordCount++; + } + if (partition != null) populatePartitionVectors(recordCount); + return recordCount; + } catch (IOException e) { + throw new DrillRuntimeException(e); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java index 209961dd6..86be49e32 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.store.ClassPathFileSystem; import org.apache.drill.exec.store.SchemaProvider; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import com.beust.jcommander.internal.Lists; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java index aa68752b7..aa68752b7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c801163ac..4dd08c132 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -82,5 +82,6 @@ drill.exec: { delete: false, size: 100000000 } - } + }, + cache.hazel.subnets: ["*.*.*.*"] }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java index 38ec007a8..71e6283e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSchemaProvider.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPlan.java @@ -15,25 +15,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.store.hive; +package org.apache.drill.exec; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.store.SchemaProvider; +import org.apache.drill.exec.client.QuerySubmitter; +import org.junit.Ignore; +import org.junit.Test; -import com.beust.jcommander.internal.Lists; +import java.io.IOException; -public class HiveSchemaProvider implements SchemaProvider{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveSchemaProvider.class); +/** + * Created with IntelliJ IDEA. + * User: sphillips + * Date: 1/24/14 + * Time: 3:46 PM + * To change this template use File | Settings | File Templates. + */ +public class TestPlan { - final HiveStorageEngineConfig configuration; + String location = "/Users/sphillips/hive-lineitem-orderkey"; + String type = "physical"; + String zkQuorum = null; + boolean local = true; + int bits = 1; - public HiveSchemaProvider(HiveStorageEngineConfig configuration, DrillConfig config){ - this.configuration = configuration; - } - @Override - public Object getSelectionBaseOnName(String tableName) { - HiveReadEntry re = new HiveReadEntry(configuration.getHiveConf(), tableName); - return Lists.newArrayList(re); + @Test + @Ignore + public void testSubmitPlan() throws Exception { + QuerySubmitter submitter = new QuerySubmitter(); + submitter.submitQuery(location, type, zkQuorum, local, bits); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java new file mode 100644 index 000000000..c6edc2000 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/hive/TestHiveScan.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.hive; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Created with IntelliJ IDEA. + * User: sphillips + * Date: 1/23/14 + * Time: 5:22 AM + * To change this template use File | Settings | File Templates. + */ +public class TestHiveScan extends PopUnitTestBase { + @Ignore + @Test + public void twoBitTwoExchangeTwoEntryRun() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + bit2.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/hive/test.json"), + Charsets.UTF_8)); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(100, count); + } + } +} diff --git a/exec/java-exec/src/test/resources/hive/test.json b/exec/java-exec/src/test/resources/hive/test.json new file mode 100644 index 000000000..a039d9e39 --- /dev/null +++ b/exec/java-exec/src/test/resources/hive/test.json @@ -0,0 +1,75 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"hive-scan", + storageengine: { type: "hive"}, + hive-table: { + "tableName" : "nation", + "dbName" : "default", + "owner" : "root", + "createTime" : 1386876893, + "lastAccessTime" : 0, + "retention" : 0, + "sd" : { + "cols" : [ { + "name" : "n_nationkey", + "type" : "bigint", + "comment" : null + }, { + "name" : "n_name", + "type" : "string", + "comment" : null + }, { + "name" : "n_regionkey", + "type" : "bigint", + "comment" : null + }, { + "name" : "n_comment", + "type" : "string", + "comment" : null + } ], + "location" : "maprfs:/user/hive/warehouse/nation", + "inputFormat" : "org.apache.hadoop.mapred.TextInputFormat", + "outputFormat" : "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "compressed" : false, + "numBuckets" : 0, + "serDeInfo" : { + "name" : null, + "serializationLib" : "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "parameters" : { + "serialization.format" : "|", + "field.delim" : "|" + } + }, + "sortCols" : [ ], + "parameters" : { } + }, + "partitionKeys" : [ ], + "parameters" : { + "numPartitions" : "0", + "numFiles" : "1", + "transient_lastDdlTime" : "1386877487", + "totalSize" : "2224", + "numRows" : "0", + "rawDataSize" : "0" + }, + "viewOriginalText" : null, + "viewExpandedText" : null, + "tableType" : "MANAGED_TABLE" + } + }, + { + @id: 2, + child: 1, + pop: "screen" + } + ] +}
\ No newline at end of file |