aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill
diff options
context:
space:
mode:
authorrebase <builds@mapr.com>2018-02-12 14:10:56 -0800
committerAman Sinha <asinha@maprtech.com>2018-10-25 16:08:51 -0700
commit0abcbe3f36bf6c0a2b5fe07a778d201ead8dd2ce (patch)
tree45fe91b40a729ed49de8cdc53dc932a50d4633b0 /exec/java-exec/src/main/java/org/apache/drill
parent61e8b464063299dc1f67445157a46c4939b0cace (diff)
DRILL-6381: (Part 1) Secondary Index framework
  1. Secondary Index planning interfaces and abstract classes like DBGroupScan, DbSubScan, IndexDecriptor etc.   2. Statistics and Cost model interfaces/classes: PluginCost, Statistics, StatisticsPayload, AbstractIndexStatistics   3. ScanBatch and RecordReader to support repeatable scan   4. Secondary Index execution related interfaces: RangePartitionSender, RowKeyJoin, PartitionFunction 5. MD-3979: Query using cast index plan fails with NPE Co-authored-by: Aman Sinha <asinha@maprtech.com> Co-authored-by: chunhui-shi <cshi@maprtech.com> Co-authored-by: Gautam Parai <gparai@maprtech.com> Co-authored-by: Padma Penumarthy <ppenumar97@yahoo.com> Co-authored-by: Hanumath Rao Maduri <hmaduri@maprtech.com> Conflicts: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java protocol/src/main/protobuf/UserBitShared.proto
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java95
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java129
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java)52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java96
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java74
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java75
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java278
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java110
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java85
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java99
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java105
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java68
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java110
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java262
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java291
41 files changed, 2844 insertions, 28 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 21e16eb5b..cb0fc5cf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -807,6 +807,8 @@ public final class ExecConstants {
*/
public static final String ENABLE_ITERATOR_VALIDATION = "drill.exec.debug.validate_iterators";
+ public static final String QUERY_ROWKEYJOIN_BATCHSIZE_KEY = "exec.query.rowkeyjoin_batchsize";
+ public static final PositiveLongValidator QUERY_ROWKEYJOIN_BATCHSIZE = new PositiveLongValidator(QUERY_ROWKEYJOIN_BATCHSIZE_KEY, Long.MAX_VALUE, null);
/**
* When iterator validation is enabled, additionally validates the vectors in
* each batch passed to each iterator.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java
new file mode 100644
index 000000000..42e4bb9ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbGroupScan.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.index.IndexCollection;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+
+public abstract class AbstractDbGroupScan extends AbstractGroupScan implements DbGroupScan {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDbGroupScan.class);
+
+ private static final String ROW_KEY = "_id";
+ private static final SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
+
+ public AbstractDbGroupScan(String userName) {
+ super(userName);
+ }
+
+ public AbstractDbGroupScan(AbstractDbGroupScan that) {
+ super(that);
+ }
+
+ public abstract AbstractStoragePlugin getStoragePlugin();
+
+ public abstract StoragePluginConfig getStorageConfig();
+
+ public abstract List<SchemaPath> getColumns();
+
+ @Override
+ public boolean supportsSecondaryIndex() {
+ return false;
+ }
+
+ @Override
+ public IndexCollection getSecondaryIndexCollection(RelNode scanrel) {
+ return null;
+ }
+
+ @Override
+ public boolean supportsRestrictedScan() {
+ return false;
+ }
+
+ @Override
+ public boolean isRestrictedScan() {
+ return false;
+ }
+
+ @Override
+ public DbGroupScan getRestrictedScan(List<SchemaPath> columns) {
+ return null;
+ }
+
+ @Override
+ public String getRowKeyName() {
+ return ROW_KEY;
+ }
+
+ @Override
+ public SchemaPath getRowKeyPath() {
+ return ROW_KEY_PATH;
+ }
+
+ @Override
+ public PartitionFunction getRangePartitionFunction(List<FieldReference> refList) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PluginCost getPluginCostModel() {
+ return null;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java
new file mode 100644
index 000000000..caa583162
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractDbSubScan.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+
+public abstract class AbstractDbSubScan extends AbstractSubScan implements DbSubScan {
+
+ public AbstractDbSubScan(String userName) {
+ super(userName);
+ }
+
+ public boolean isRestrictedSubScan() {
+ return false;
+ }
+
+ @Override
+ public void addJoinForRestrictedSubScan(RowKeyJoin batch) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 340c303f7..ca82ca621 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.config.IteratorValidator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.Limit;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.config.ProducerConsumer;
import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RangePartitionSender;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
@@ -157,6 +159,16 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
+ public T visitHashPartitionSender(HashToRandomExchange op, X value) throws E {
+ return visitExchange(op, value);
+ }
+
+ @Override
+ public T visitRangePartitionSender(RangePartitionSender op, X value) throws E {
+ return visitSender(op, value);
+ }
+
+ @Override
public T visitBroadcastSender(BroadcastSender op, X value) throws E {
return visitSender(op, value);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java
new file mode 100644
index 000000000..e16fba1ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbGroupScan.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.index.IndexCollection;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
+import org.apache.drill.exec.planner.index.Statistics;
+
+import java.util.List;
+
+/**
+ * A DbGroupScan operator represents the scan associated with a database. The underlying
+ * database may support secondary indexes, so there are interface methods for indexes.
+ */
+public interface DbGroupScan extends GroupScan {
+
+
+ @JsonIgnore
+ public boolean supportsSecondaryIndex();
+
+ /**
+ * Get the index collection associated with this table if any
+ */
+ @JsonIgnore
+ public IndexCollection getSecondaryIndexCollection(RelNode scan);
+
+ /**
+ * Set the artificial row count after applying the {@link RexNode} condition
+ * @param condition
+ * @param count
+ * @param capRowCount
+ */
+ @JsonIgnore
+ public void setRowCount(RexNode condition, double count, double capRowCount);
+
+ /**
+ * Get the row count after applying the {@link RexNode} condition
+ * @param condition, filter to apply
+ * @param scanRel, the current scan rel
+ * @return row count post filtering
+ */
+ @JsonIgnore
+ public double getRowCount(RexNode condition, RelNode scanRel);
+
+ /**
+ * Get the statistics for this {@link DbGroupScan}
+ * @return the {@link Statistics} for this Scan
+ */
+ @JsonIgnore
+ public Statistics getStatistics();
+
+ public List<SchemaPath> getColumns();
+
+ public void setCostFactor(double sel);
+
+ @JsonIgnore
+ boolean isIndexScan();
+
+ /**
+ * Whether this DbGroupScan supports creating a restricted (skip) scan
+ * @return true if restricted scan is supported, false otherwise
+ */
+ @JsonIgnore
+ boolean supportsRestrictedScan();
+
+ /**
+ * Whether this DbGroupScan is itself a restricted scan
+ * @return true if this DbGroupScan is itself a restricted scan, false otherwise
+ */
+ @JsonIgnore
+ boolean isRestrictedScan();
+
+ /**
+ * If this DbGroupScan supports restricted scan, create a restricted scan from this DbGroupScan.
+ * @param columns
+ * @return a non-null DbGroupScan if restricted scan is supported, null otherwise
+ */
+ @JsonIgnore
+ DbGroupScan getRestrictedScan(List<SchemaPath> columns);
+
+ @JsonIgnore
+ String getRowKeyName();
+
+ @JsonIgnore
+ String getIndexHint();
+
+ @JsonIgnore
+ SchemaPath getRowKeyPath();
+
+ /**
+ * Get a partition function instance for range based partitioning
+ * @param refList a list of FieldReference exprs that are participating in the range partitioning
+ * @return instance of a partitioning function
+ */
+ @JsonIgnore
+ PartitionFunction getRangePartitionFunction(List<FieldReference> refList);
+
+ /**
+ * Get the format plugin cost model. The cost model will provide cost factors such as seq. scan cost,
+ * random scan cost, block size.
+ * @return a PluginCost cost model
+ */
+ @JsonIgnore
+ PluginCost getPluginCostModel();
+
+ @JsonIgnore
+ boolean isFilterPushedDown();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java
new file mode 100644
index 000000000..874468d4e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/DbSubScan.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+
+public interface DbSubScan extends SubScan {
+
+ /**
+ * Whether this subscan is a restricted (skip) subscan
+ * @return true if this subscan is a restricted subscan, false otherwise
+ */
+ @JsonIgnore
+ boolean isRestrictedSubScan();
+
+ /**
+ * For a restricted sub-scan, this method allows associating a (hash)join instance. A subscan within a minor
+ * fragment must have a corresponding (hash)join batch instance from which it will retrieve its set of
+ * rowkeys to perform the restricted scan.
+ * @param batch
+ */
+ @JsonIgnore
+ void addJoinForRestrictedSubScan(RowKeyJoin batch);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java
new file mode 100644
index 000000000..1047e829d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/IndexGroupScan.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.base;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.index.Statistics;
+
+
+import java.util.List;
+
+/**
+ * An IndexGroupScan operator represents the scan associated with an Index.
+ */
+public interface IndexGroupScan extends GroupScan {
+
+ /**
+ * Get the column ordinal of the rowkey column from the output schema of the IndexGroupScan
+ * @return
+ */
+ @JsonIgnore
+ public int getRowKeyOrdinal();
+
+ /**
+ * Set the artificial row count after applying the {@link RexNode} condition
+ * Mainly used for debugging
+ * @param condition
+ * @param count
+ * @param capRowCount
+ */
+ @JsonIgnore
+ public void setRowCount(RexNode condition, double count, double capRowCount);
+
+ /**
+ * Get the row count after applying the {@link RexNode} condition
+ * @param condition, filter to apply
+ * @return row count post filtering
+ */
+ @JsonIgnore
+ public double getRowCount(RexNode condition, RelNode scanRel);
+
+ /**
+ * Set the statistics for {@link IndexGroupScan}
+ * @param statistics
+ */
+ @JsonIgnore
+ public void setStatistics(Statistics statistics);
+
+ @JsonIgnore
+ public void setColumns(List<SchemaPath> columns);
+
+ @JsonIgnore
+ public List<SchemaPath> getColumns();
+
+ @JsonIgnore
+ public void setParallelizationWidth(int width);
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index f2e53eb03..1bb1545c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.physical.config.IteratorValidator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.Limit;
@@ -29,6 +30,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.config.ProducerConsumer;
import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.config.RangePartitionSender;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.config.Sort;
@@ -73,6 +75,8 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA value) throws EXCEP;
public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) throws EXCEP;
public RETURN visitMergingReceiver(MergingReceiverPOP op, EXTRA value) throws EXCEP;
+ public RETURN visitHashPartitionSender(HashToRandomExchange op, EXTRA value) throws EXCEP;
+ public RETURN visitRangePartitionSender(RangePartitionSender op, EXTRA value) throws EXCEP;
public RETURN visitBroadcastSender(BroadcastSender op, EXTRA value) throws EXCEP;
public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java
index 88c3be0e3..0c0852a07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangePartitionSender.java
@@ -17,57 +17,57 @@
*/
package org.apache.drill.exec.physical.config;
-import java.util.Collections;
import java.util.List;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.base.AbstractSender;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.planner.physical.PartitionFunction;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-@JsonTypeName("range-sender")
-public class RangeSender extends AbstractSender{
+@JsonTypeName("range-partition-sender")
+public class RangePartitionSender extends AbstractSender{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangeSender.class);
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangePartitionSender.class);
- List<EndpointPartition> partitions;
+ // The number of records in the outgoing batch. This is overriding the default value in Partitioner
+ public static final int RANGE_PARTITION_OUTGOING_BATCH_SIZE = (1 << 12) - 1;
+
+ @JsonProperty("partitionFunction")
+ private PartitionFunction partitionFunction;
@JsonCreator
- public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) {
- super(oppositeMajorFragmentId, child, Collections.<MinorFragmentEndpoint>emptyList());
- this.partitions = partitions;
+ public RangePartitionSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId,
+ @JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("destinations") List<MinorFragmentEndpoint> endpoints,
+ @JsonProperty("partitionFunction") PartitionFunction partitionFunction) {
+ super(oppositeMajorFragmentId, child, endpoints);
+ this.partitionFunction = partitionFunction;
}
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new RangeSender(oppositeMajorFragmentId, child, partitions);
+ return new RangePartitionSender(oppositeMajorFragmentId, child, destinations, partitionFunction);
}
- public static class EndpointPartition{
- private final PartitionRange range;
- private final DrillbitEndpoint endpoint;
+ @JsonProperty("partitionFunction")
+ public PartitionFunction getPartitionFunction() {
+ return partitionFunction;
+ }
- @JsonCreator
- public EndpointPartition(@JsonProperty("range") PartitionRange range, @JsonProperty("endpoint") DrillbitEndpoint endpoint) {
- super();
- this.range = range;
- this.endpoint = endpoint;
- }
- public PartitionRange getRange() {
- return range;
- }
- public DrillbitEndpoint getEndpoint() {
- return endpoint;
- }
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitRangePartitionSender(this, value);
}
@Override
public int getOperatorType() {
- return CoreOperatorType.RANGE_SENDER_VALUE;
+ return CoreOperatorType.RANGE_PARTITION_SENDER_VALUE;
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index dc8dd0fd8..5ccf1c093 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,10 +83,14 @@ public class ScanBatch implements CloseableRecordBatch {
private final List<Map<String, String>> implicitColumnList;
private String currentReaderClassName;
private final RecordBatchStatsContext batchStatsContext;
+
// Represents last outcome of next(). If an Exception is thrown
// during the method's execution a value IterOutcome.STOP will be assigned.
private IterOutcome lastOutcome;
+ private List<RecordReader> readerList = null; // needed for repeatable scanners
+ private boolean isRepeatableScan = false; // needed for repeatable scanners
+
/**
*
* @param context
@@ -137,6 +141,15 @@ public class ScanBatch implements CloseableRecordBatch {
readers, Collections.<Map<String, String>> emptyList());
}
+ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+ List<RecordReader> readerList, boolean isRepeatableScan)
+ throws ExecutionSetupException {
+ this(context, context.newOperatorContext(subScanConfig),
+ readerList, Collections.<Map<String, String>> emptyList());
+ this.readerList = readerList;
+ this.isRepeatableScan = isRepeatableScan;
+ }
+
@Override
public FragmentContext getContext() {
return context;
@@ -255,7 +268,7 @@ public class ScanBatch implements CloseableRecordBatch {
return false;
}
currentReader = readers.next();
- if (readers.hasNext()) {
+ if (!isRepeatableScan && readers.hasNext()) {
readers.remove();
}
implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java
new file mode 100644
index 000000000..7b4dfcaaa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/RowKeyJoin.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.record.AbstractRecordBatch.BatchState;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Interface for a row key join
+ */
+public interface RowKeyJoin {
+
+ /**
+ * Enum for RowKeyJoin internal state.
+ * Possible states are {INITIAL, PROCESSING, DONE}
+ *
+ * Initially RowKeyJoin will be at INITIAL state. Then the state will be transitioned
+ * by the RestrictedJsonRecordReader to PROCESSING as soon as it processes the rows
+ * related to RowKeys. Then RowKeyJoin algorithm sets to INITIAL state when leftStream has no data.
+ * Basically RowKeyJoin calls leftStream multiple times depending upon the rightStream, hence
+ * this transition from PROCESSING to INITIAL. If there is no data from rightStream or OutOfMemory
+ * condition then the state is transitioned to DONE.
+ */
+ public enum RowKeyJoinState {
+ INITIAL, PROCESSING, DONE;
+ }
+
+ /**
+ * Is the next batch of row keys ready to be returned
+ * @return True if ready, false if not
+ */
+ public boolean hasRowKeyBatch();
+
+ /**
+ * Get the next batch of row keys
+ * @return a Pair whose left element is the ValueVector containing the row keys, right
+ * element is the number of row keys in this batch
+ */
+ public Pair<ValueVector, Integer> nextRowKeyBatch();
+
+
+ /**
+ * Get the current BatchState (this is useful when performing row key join)
+ */
+ public BatchState getBatchState();
+
+ /**
+ * Set the BatchState (this is useful when performing row key join)
+ * @param newState
+ */
+ public void setBatchState(BatchState newState);
+
+ /**
+ * Set the RowKeyJoinState (this is useful for maintaining state for row key join algorithm)
+ * @param newState
+ */
+ public void setRowKeyJoinState(RowKeyJoinState newState);
+
+ /**
+ * Get the current RowKeyJoinState.
+ */
+ public RowKeyJoinState getRowKeyJoinState();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 96c311216..b39328ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -396,4 +396,21 @@ public abstract class DrillRelOptUtil {
}
}
}
+
+ public static boolean isProjectFlatten(RelNode project) {
+
+ assert project instanceof Project : "Rel is NOT an instance of project!";
+
+ for (RexNode rex : project.getChildExps()) {
+ RexNode newExpr = rex;
+ if (rex instanceof RexCall) {
+ RexCall function = (RexCall) rex;
+ String functionName = function.getOperator().getName();
+ if (functionName.equalsIgnoreCase("flatten") ) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java
new file mode 100644
index 000000000..d765162fb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PluginCost.java
@@ -0,0 +1,79 @@
+package org.apache.drill.exec.planner.cost;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.drill.exec.physical.base.GroupScan;
+
+/**
+ * PluginCost describes the cost factors to be used when costing for the specific storage/format plugin
+ */
+public interface PluginCost {
+ org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PluginCost.class);
+
+ /**
+ * An interface to check if a parameter provided by user is valid or not.
+ * @param <T> Type of the parameter.
+ */
+ interface CheckValid<T> {
+ boolean isValid(T paramValue);
+ }
+
+ /**
+ * Class which checks whether the provided parameter value is greater than
+ * or equals to a minimum limit.
+ */
+ class greaterThanEquals implements CheckValid<Integer> {
+ private final Integer atleastEqualsTo;
+ public greaterThanEquals(Integer atleast) {
+ atleastEqualsTo = atleast;
+ }
+
+ @Override
+ public boolean isValid(Integer paramValue) {
+ if (paramValue >= atleastEqualsTo &&
+ paramValue <= Integer.MAX_VALUE) {
+ return true;
+ } else {
+ logger.warn("Setting default value as the supplied parameter value is less than {}", paramValue);
+ return false;
+ }
+ }
+ }
+
+ /**
+ * @return the average column size in bytes
+ */
+ int getAverageColumnSize(GroupScan scan);
+
+ /**
+ * @return the block size in bytes
+ */
+ int getBlockSize(GroupScan scan);
+
+ /**
+ * @return the sequential block read cost
+ */
+ int getSequentialBlockReadCost(GroupScan scan);
+
+ /**
+ * @return the random block read cost
+ */
+ int getRandomBlockReadCost(GroupScan scan);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java
new file mode 100644
index 000000000..9894b3263
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexCollection.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import java.util.Iterator;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+
+/**
+ * Abstract base class for Index collection (collection of Index descriptors)
+ *
+ */
+public abstract class AbstractIndexCollection implements IndexCollection, Iterable<IndexDescriptor> {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractIndexCollection.class);
+ /**
+ * A set of indexes for a particular table
+ */
+ @JsonProperty
+ protected List<IndexDescriptor> indexes;
+
+ public AbstractIndexCollection() {
+ indexes = Lists.newArrayList();
+ }
+
+ @Override
+ public boolean addIndex(IndexDescriptor index) {
+ return indexes.add(index);
+ }
+
+ @Override
+ public boolean removeIndex(IndexDescriptor index) {
+ return indexes.remove(index);
+ }
+
+ @Override
+ public void clearAll() {
+ indexes.clear();
+ }
+
+ @Override
+ public boolean supportsIndexSelection() {
+ return false;
+ }
+
+ @Override
+ public double getRows(RexNode indexCondition) {
+ throw new UnsupportedOperationException("getRows() not supported for this index collection.");
+ }
+
+ @Override
+ public boolean supportsRowCountStats() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsFullTextSearch() {
+ return false;
+ }
+
+ @Override
+ public boolean isColumnIndexed(SchemaPath path) {
+ for (IndexDescriptor index : indexes) {
+ if (index.getIndexColumnOrdinal(path) >= 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Iterator<IndexDescriptor> iterator() {
+ return indexes.iterator();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
new file mode 100644
index 000000000..f908ead4c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+
+/**
+ * Abstract base class for an Index descriptor
+ *
+ */
+public abstract class AbstractIndexDescriptor extends DrillIndexDefinition implements IndexDescriptor {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractIndexDescriptor .class);
+
+ public AbstractIndexDescriptor(List<LogicalExpression> indexCols,
+ CollationContext indexCollationContext,
+ List<LogicalExpression> nonIndexCols,
+ List<LogicalExpression> rowKeyColumns,
+ String indexName,
+ String tableName,
+ IndexType type,
+ NullDirection nullsDirection) {
+ super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection);
+ }
+
+ @Override
+ public double getRows(RelNode scan, RexNode indexCondition) {
+ throw new UnsupportedOperationException("getRows() not supported for this index.");
+ }
+
+ @Override
+ public boolean supportsRowCountStats() {
+ return false;
+ }
+
+ @Override
+ public IndexGroupScan getIndexGroupScan() {
+ throw new UnsupportedOperationException("Group scan not supported for this index.");
+ }
+
+ @Override
+ public boolean supportsFullTextSearch() {
+ return false;
+ }
+
+ @Override
+ public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+ int numProjectedFields, GroupScan primaryGroupScan) {
+ throw new UnsupportedOperationException("getCost() not supported for this index.");
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java
new file mode 100644
index 000000000..dfc0897ee
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/AbstractIndexStatistics.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public abstract class AbstractIndexStatistics implements IndexStatistics {
+
+ protected static final Logger logger = LoggerFactory.getLogger(AbstractIndexStatistics.class);
+ protected final RelNode input;
+ protected final RexNode condition;
+ protected final DrillTable table;
+
+ public AbstractIndexStatistics(RelNode input, RexNode condition, DrillTable table) {
+ this.input = input;
+ this.condition = condition;
+ this.table = table;
+ }
+ public abstract double getRowCount();
+
+ public List<RelCollation> getCollations() {
+ throw new UnsupportedOperationException();
+ }
+
+ public RelDistribution getDistribution() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java
new file mode 100644
index 000000000..8260beea4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/CollationContext.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.drill.common.expression.LogicalExpression;
+
+import java.util.List;
+import java.util.Map;
+
+public class CollationContext {
+
+ public final Map<LogicalExpression, RelFieldCollation> collationMap;
+ public final List<RelFieldCollation> relFieldCollations;
+
+ public CollationContext(Map<LogicalExpression, RelFieldCollation> collationMap,
+ List<RelFieldCollation> relFieldCollations) {
+ this.collationMap = collationMap;
+ this.relFieldCollations = relFieldCollations;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java
new file mode 100644
index 000000000..0ea3d83e7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexCollection.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.Set;
+
+public class DrillIndexCollection extends AbstractIndexCollection {
+ private final RelNode scan; // physical scan rel corresponding to the primary table
+
+ public DrillIndexCollection(RelNode scanRel,
+ Set<DrillIndexDescriptor> indexes) {
+ this.scan = scanRel;
+ for (IndexDescriptor index : indexes) {
+ super.addIndex(index);
+ }
+ }
+
+ private IndexDescriptor getIndexDescriptor() {
+
+ //XXX need a policy to pick the indexDesc to use instead of picking the first one.
+ return this.indexes.iterator().next();
+ }
+
+ @Override
+ public boolean supportsIndexSelection() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsRowCountStats() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsFullTextSearch() {
+ return true;
+ }
+
+ @Override
+ public double getRows(RexNode indexCondition) {
+
+ return getIndexDescriptor().getRows(scan, indexCondition);
+ }
+
+ @Override
+ public IndexGroupScan getGroupScan() {
+ return getIndexDescriptor().getIndexGroupScan();
+ }
+
+ @Override
+ public IndexCollectionType getIndexCollectionType() {
+ return IndexCollection.IndexCollectionType.EXTERNAL_SECONDARY_INDEX_COLLECTION;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java
new file mode 100644
index 000000000..03c2a44c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDefinition.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DrillIndexDefinition implements IndexDefinition {
+ /**
+ * The indexColumns is the list of column(s) on which this index is created. If there is more than 1 column,
+ * the order of the columns is important: index on {a, b} is not the same as index on {b, a}
+ * NOTE: the indexed column could be of type columnfamily.column
+ */
+ @JsonProperty
+ protected final List<LogicalExpression> indexColumns;
+
+ /**
+ * nonIndexColumns: the list of columns that are included in the index as 'covering'
+ * columns but are not themselves indexed. These are useful for covering indexes where the
+ * query request can be satisfied directly by the index and avoid accessing the table altogether.
+ */
+ @JsonProperty
+ protected final List<LogicalExpression> nonIndexColumns;
+
+ @JsonIgnore
+ protected final Set<LogicalExpression> allIndexColumns;
+
+ @JsonProperty
+ protected final List<LogicalExpression> rowKeyColumns;
+
+ @JsonProperty
+ protected final CollationContext indexCollationContext;
+
+ /**
+ * indexName: name of the index that should be unique within the scope of a table
+ */
+ @JsonProperty
+ protected final String indexName;
+
+ protected final String tableName;
+
+ @JsonProperty
+ protected final IndexDescriptor.IndexType indexType;
+
+ @JsonProperty
+ protected final NullDirection nullsDirection;
+
+ public DrillIndexDefinition(List<LogicalExpression> indexCols,
+ CollationContext indexCollationContext,
+ List<LogicalExpression> nonIndexCols,
+ List<LogicalExpression> rowKeyColumns,
+ String indexName,
+ String tableName,
+ IndexType type,
+ NullDirection nullsDirection) {
+ this.indexColumns = indexCols;
+ this.nonIndexColumns = nonIndexCols;
+ this.rowKeyColumns = rowKeyColumns;
+ this.indexName = indexName;
+ this.tableName = tableName;
+ this.indexType = type;
+ this.allIndexColumns = Sets.newHashSet(indexColumns);
+ this.allIndexColumns.addAll(nonIndexColumns);
+ this.indexCollationContext = indexCollationContext;
+ this.nullsDirection = nullsDirection;
+
+ }
+
+ @Override
+ public int getIndexColumnOrdinal(LogicalExpression path) {
+ int id = indexColumns.indexOf(path);
+ return id;
+ }
+
+ @Override
+ public boolean isCoveringIndex(List<LogicalExpression> columns) {
+ return allIndexColumns.containsAll(columns);
+ }
+
+ @Override
+ public boolean allColumnsIndexed(Collection<LogicalExpression> columns) {
+ return columnsInIndexFields(columns, indexColumns);
+ }
+
+ @Override
+ public boolean someColumnsIndexed(Collection<LogicalExpression> columns) {
+ return someColumnsInIndexFields(columns, indexColumns);
+ }
+
+ public boolean pathExactIn(SchemaPath path, Collection<LogicalExpression> exprs) {
+ for (LogicalExpression expr : exprs) {
+ if (expr instanceof SchemaPath) {
+ if (((SchemaPath) expr).toExpr().equals(path.toExpr())) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ boolean castIsCompatible(CastExpression castExpr, Collection<LogicalExpression> indexFields) {
+ for(LogicalExpression indexExpr : indexFields) {
+ if(indexExpr.getClass() != castExpr.getClass()) {
+ continue;
+ }
+ CastExpression indexCastExpr = (CastExpression)indexExpr;
+ //we compare input using equals because we know we are comparing SchemaPath,
+ //if we extend to support other expression, make sure the equals of that expression
+ //is implemented properly, otherwise it will fall to identity comparison
+ if ( !castExpr.getInput().equals(indexCastExpr.getInput()) ) {
+ continue;
+ }
+
+ if( castExpr.getMajorType().getMinorType() != indexCastExpr.getMajorType().getMinorType()) {
+ continue;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ protected boolean columnsInIndexFields(Collection<LogicalExpression> columns, Collection<LogicalExpression> indexFields) {
+ //we need to do extra check, so we could allow the case when query condition expression is not identical with indexed fields
+ //and they still could use the index either by implicit cast or the difference is allowed, e.g. width of varchar
+ for (LogicalExpression col : columns) {
+ if (col instanceof CastExpression) {
+ if (!castIsCompatible((CastExpression) col, indexFields)) {
+ return false;
+ }
+ }
+ else {
+ if (!pathExactIn((SchemaPath)col, indexFields)) {
+ return false;
+ }
+ }
+ }
+ return true;//indexFields.containsAll(columns);
+ }
+
+ protected boolean someColumnsInIndexFields(Collection<LogicalExpression> columns,
+ Collection<LogicalExpression> indexFields) {
+
+ //we need to do extra check, so we could allow the case when query condition expression is not identical with indexed fields
+ //and they still could use the index either by implicit cast or the difference is allowed, e.g. width of varchar
+ for (LogicalExpression col : columns) {
+ if (col instanceof CastExpression) {
+ if (castIsCompatible((CastExpression) col, indexFields)) {
+ return true;
+ }
+ }
+ else {
+ if (pathExactIn((SchemaPath)col, indexFields)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ String columnsDesc = " Index columns: " + indexColumns.toString() + " Non-Index columns: " + nonIndexColumns.toString();
+ String desc = "Table: " + tableName + " Index: " + indexName + columnsDesc;
+ return desc;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null) {
+ return false;
+ }
+ DrillIndexDefinition index1 = (DrillIndexDefinition) o;
+ return tableName.equals(index1.tableName)
+ && indexName.equals(index1.indexName)
+ && indexType.equals(index1.indexType)
+ && indexColumns.equals(index1.indexColumns);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ final String fullName = tableName + indexName;
+ int result = 1;
+ result = prime * result + fullName.hashCode();
+ result = prime * result + indexType.hashCode();
+
+ return result;
+ }
+
+ @Override
+ @JsonProperty
+ public String getIndexName() {
+ return indexName;
+ }
+
+ @Override
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ @JsonProperty
+ public IndexDescriptor.IndexType getIndexType() {
+ return indexType;
+ }
+
+ @Override
+ @JsonProperty
+ public List<LogicalExpression> getRowKeyColumns() {
+ return this.rowKeyColumns;
+ }
+
+ @Override
+ @JsonProperty
+ public List<LogicalExpression> getIndexColumns() {
+ return this.indexColumns;
+ }
+
+ @Override
+ @JsonProperty
+ public List<LogicalExpression> getNonIndexColumns() {
+ return this.nonIndexColumns;
+ }
+
+ @Override
+ @JsonIgnore
+ public RelCollation getCollation() {
+ if (indexCollationContext != null) {
+ return RelCollations.of(indexCollationContext.relFieldCollations);
+ }
+ return null;
+ }
+
+ @Override
+ @JsonIgnore
+ public Map<LogicalExpression, RelFieldCollation> getCollationMap() {
+ return indexCollationContext.collationMap;
+ }
+
+ @Override
+ @JsonIgnore
+ public NullDirection getNullsOrderingDirection() {
+ return nullsDirection;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java
new file mode 100644
index 000000000..4da62c204
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/DrillIndexDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.drill.exec.planner.cost.PluginCost;
+import org.apache.drill.exec.planner.logical.DrillTable;
+
+import java.io.IOException;
+import java.util.List;
+
+public class DrillIndexDescriptor extends AbstractIndexDescriptor {
+
+ /**
+ * The name of Drill's Storage Plugin on which the Index was stored
+ */
+ private String storage;
+
+ private DrillTable table;
+
+ public DrillIndexDescriptor(List<LogicalExpression> indexCols,
+ CollationContext indexCollationContext,
+ List<LogicalExpression> nonIndexCols,
+ List<LogicalExpression> rowKeyColumns,
+ String indexName,
+ String tableName,
+ IndexType type,
+ NullDirection nullsDirection) {
+ super(indexCols, indexCollationContext, nonIndexCols, rowKeyColumns, indexName, tableName, type, nullsDirection);
+ }
+
+ public DrillIndexDescriptor(DrillIndexDefinition def) {
+ this(def.indexColumns, def.indexCollationContext, def.nonIndexColumns, def.rowKeyColumns, def.indexName,
+ def.getTableName(), def.getIndexType(), def.nullsDirection);
+ }
+
+ @Override
+ public double getRows(RelNode scan, RexNode indexCondition) {
+ //TODO: real implementation is to use Drill's stats implementation. for now return fake value 1.0
+ return 1.0;
+ }
+
+ @Override
+ public IndexGroupScan getIndexGroupScan() {
+ try {
+ final DrillTable idxTable = getDrillTable();
+ GroupScan scan = idxTable.getGroupScan();
+
+ if (!(scan instanceof IndexGroupScan)){
+ logger.error("The Groupscan from table {} is not an IndexGroupScan", idxTable.toString());
+ return null;
+ }
+ return (IndexGroupScan)scan;
+ }
+ catch(IOException e) {
+ logger.error("Error in getIndexGroupScan ", e);
+ }
+ return null;
+ }
+
+ public void attach(String storageName, DrillTable inTable) {
+ storage = storageName;
+ setDrillTable(inTable);
+ }
+
+ public void setStorageName(String storageName) {
+ storage = storageName;
+ }
+
+ public String getStorageName() {
+ return storage;
+ }
+
+ public void setDrillTable(DrillTable table) {
+ this.table = table;
+ }
+
+ public DrillTable getDrillTable() {
+ return this.table;
+ }
+
+ public FunctionalIndexInfo getFunctionalInfo() {
+ return null;
+ }
+
+ @Override
+ public PluginCost getPluginCostModel() {
+ return null;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java
new file mode 100644
index 000000000..a12dcc6c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/FunctionalIndexInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * FunctionalIndexInfo is to collect Functional fields in IndexDescriptor, derive information needed for index plan,
+ * e.g. convert and rewrite filter, columns, and rowtype on index scan that involve functional index.
+ * In case different store might have different way to rename expression in index table, we allow storage plugin
+ */
+public interface FunctionalIndexInfo {
+
+ /**
+ * @return if this index has functional indexed field, return true
+ */
+ boolean hasFunctional();
+
+ /**
+ * @return the IndexDescriptor this IndexInfo built from
+ */
+ IndexDescriptor getIndexDesc();
+
+ /**
+ * getNewPath: for an original path, return new rename '$N' path, notice there could be multiple renamed paths
+ * if the there are multiple functional indexes refer original path.
+ * @param path
+ * @return
+ */
+ SchemaPath getNewPath(SchemaPath path);
+
+ /**
+ * return a plain field path if the incoming index expression 'expr' is replaced to be a plain field
+ * @param expr suppose to be an indexed expression
+ * @return the renamed schemapath in index table for the indexed expression
+ */
+ SchemaPath getNewPathFromExpr(LogicalExpression expr);
+
+ /**
+ * @return the map of indexed expression --> the involved schema paths in a indexed expression
+ */
+ Map<LogicalExpression, Set<SchemaPath>> getPathsInFunctionExpr();
+
+ /**
+ * @return the map between indexed expression and to-be-converted target expression for scan in index
+ * e.g. cast(a.b as int) -> '$0'
+ */
+ Map<LogicalExpression, LogicalExpression> getExprMap();
+
+ /**
+ * @return the set of all new field names for indexed functions in index
+ */
+ Set<SchemaPath> allNewSchemaPaths();
+
+ /**
+ * @return the set of all schemaPath exist in functional index fields
+ */
+ Set<SchemaPath> allPathsInFunction();
+
+ /**
+ * Whether this implementation( may be different per storage) support rewrite rewriting varchar equality expression,
+ * e.g. cast(a.b as varchar(2)) = 'ca' to LIKE expression: cast(a.b as varchar(2) LIKE 'ca%'
+ */
+ boolean supportEqualCharConvertToLike();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
new file mode 100644
index 000000000..65788cb52
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCallContext.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
+import java.util.List;
+import java.util.Set;
+
+public interface IndexCallContext {
+ DrillScanRelBase getScan();
+
+ DbGroupScan getGroupScan();
+
+ List<RelCollation> getCollationList();
+
+ RelCollation getCollation();
+
+ boolean hasLowerProject();
+
+ boolean hasUpperProject();
+
+ RelOptRuleCall getCall();
+
+ Set<LogicalExpression> getLeftOutPathsInFunctions();
+
+ RelNode getFilter();
+
+ IndexableExprMarker getOrigMarker();
+
+ List<LogicalExpression> getSortExprs();
+
+ DrillProjectRelBase getLowerProject();
+
+ DrillProjectRelBase getUpperProject();
+
+ void setLeftOutPathsInFunctions(Set<LogicalExpression> exprs);
+
+ List<SchemaPath> getScanColumns();
+
+ RexNode getFilterCondition();
+
+ RexNode getOrigCondition();
+
+ Sort getSort();
+
+ void createSortExprs();
+
+ RelNode getExchange();
+
+ List<DistributionField> getDistributionFields();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java
new file mode 100644
index 000000000..9b4d170e0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexCollection.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+
+// Interface used to describe an index collection
+public interface IndexCollection extends Iterable<IndexDescriptor> {
+ /**
+ * Types of an index collections: NATIVE_SECONDARY_INDEX_COLLECTION, EXTERNAL_SECONDARY_INDEX_COLLECTION
+ */
+ public static enum IndexCollectionType {
+ NATIVE_SECONDARY_INDEX_COLLECTION,
+ EXTERNAL_SECONDARY_INDEX_COLLECTION
+ };
+
+ /**
+ * Add a new index to the collection. Return True if index was successfully added; False otherwise
+ */
+ public boolean addIndex(IndexDescriptor index);
+
+ /**
+ * Remove an index (identified by table name and index name) from the collection.
+ * Return True if index was successfully removed; False otherwise
+ */
+ public boolean removeIndex(IndexDescriptor index);
+
+ /**
+ * Clears all entries from this index collection
+ */
+ public void clearAll();
+
+ /**
+ * Get the type of this index based on {@link IndexCollectionType}
+ * @return one of the values in {@link IndexCollectionType}
+ */
+ public IndexCollectionType getIndexCollectionType();
+
+ /**
+ * Whether or not this index collection supports index selection (selecting an
+ * appropriate index out of multiple candidates). Typically, external index collections
+ * such as Elasticsearch already have this capability while native secondary index collection
+ * may not have - in such cases, Drill needs to do the index selection.
+ */
+ public boolean supportsIndexSelection();
+
+ /**
+ * Get the estimated row count for a single index condition
+ * @param indexCondition The index condition (e.g index_col1 < 10 AND index_col2 = 'abc')
+ * @return The estimated row count
+ */
+ public double getRows(RexNode indexCondition);
+
+ /**
+ * Whether or not the index supports getting row count statistics
+ * @return True if index supports getting row count, False otherwise
+ */
+ public boolean supportsRowCountStats();
+
+ /**
+ * Whether or not the index supports full-text search (to allow pushing down such filters)
+ * @return True if index supports full-text search, False otherwise
+ */
+ public boolean supportsFullTextSearch();
+
+ /**
+ * If this IndexCollection exposes a single GroupScan, return the GroupScan instance. For external indexes
+ * such as Elasticsearch, we may have a single GroupScan representing all the indexes contained
+ * within that collection. On the other hand, for native indexes, each separate index would
+ * have its own GroupScan.
+ * @return GroupScan for this IndexCollection if available, otherwise null
+ */
+ public IndexGroupScan getGroupScan();
+
+ /**
+ * Check if the field name is the leading key of any of the indexes in this collection
+ * @param path
+ * @return True if an appropriate index is found, False otherwise
+ */
+ public boolean isColumnIndexed(SchemaPath path);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java
new file mode 100644
index 000000000..995d23c57
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDefinition.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
+import org.apache.drill.common.expression.LogicalExpression;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+// Interface used to define an index,
+public interface IndexDefinition {
+ /**
+ * Types of an index: PRIMARY_KEY_INDEX, NATIVE_SECONDARY_INDEX, EXTERNAL_SECONDARY_INDEX
+ */
+ static enum IndexType {
+ PRIMARY_KEY_INDEX,
+ NATIVE_SECONDARY_INDEX,
+ EXTERNAL_SECONDARY_INDEX
+ };
+
+ /**
+ * Check to see if the field name is an index column and if so return the ordinal position in the index
+ * @param path The field path you want to compare to index column names.
+ * @return Return ordinal of the indexed column if valid, otherwise return -1
+ */
+ int getIndexColumnOrdinal(LogicalExpression path);
+
+ /**
+ * Get the name of the index
+ */
+ String getIndexName();
+
+ /**
+ * Check if this index 'covers' all the columns specified in the supplied list of columns
+ * @param columns
+ * @return True for covering index, False for non-covering
+ */
+ boolean isCoveringIndex(List<LogicalExpression> columns);
+
+ /**
+ * Check if this index have all the columns specified in the supplied list of columns indexed
+ * @param columns
+ * @return True if all fields are indexed, False for some or all fields is not indexed
+ */
+ boolean allColumnsIndexed(Collection<LogicalExpression> columns);
+
+ /**
+ * Check if this index has some columns specified in the supplied list of columns indexed
+ * @param columns
+ * @return True if some fields are indexed, False if none of the fields are indexed
+ */
+ boolean someColumnsIndexed(Collection<LogicalExpression> columns);
+
+ /**
+ * Get the list of columns (typically 1 column) that constitute the row key (primary key)
+ * @return
+ */
+ List<LogicalExpression> getRowKeyColumns();
+
+ /**
+ * Get the name of the table this index is associated with
+ */
+ String getTableName();
+
+ /**
+ * Get the type of this index based on {@link IndexType}
+ * @return one of the values in {@link IndexType}
+ */
+ IndexType getIndexType();
+
+
+ List<LogicalExpression> getIndexColumns();
+
+ List<LogicalExpression> getNonIndexColumns();
+
+ RelCollation getCollation();
+
+ Map<LogicalExpression, RelFieldCollation> getCollationMap();
+
+ /**
+ * Get the nulls ordering of this index
+ * @return True, if nulls first. False otherwise
+ */
+ NullDirection getNullsOrderingDirection();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
new file mode 100644
index 000000000..f355285cc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.IndexGroupScan;
+import org.apache.drill.exec.planner.cost.PluginCost;
+
+
+/**
+ * IndexDefinition + functions to access materialized index(index table/scan, etc)
+ */
+
+public interface IndexDescriptor extends IndexDefinition {
+
+ /**
+ * Get the estimated row count for a single index condition
+ * @param input The rel node corresponding to the primary table
+ * @param indexCondition The index condition (e.g index_col1 < 10 AND index_col2 = 'abc')
+ * @return The estimated row count
+ */
+ double getRows(RelNode input, RexNode indexCondition);
+
+ /**
+ * Whether or not the index supports getting row count statistics
+ * @return True if index supports getting row count, False otherwise
+ */
+ boolean supportsRowCountStats();
+
+ /**
+ * Get an instance of the group scan associated with this index descriptor
+ * @return An instance of group scan for this index
+ */
+ IndexGroupScan getIndexGroupScan();
+
+ /**
+ * Whether or not the index supports full-text search (to allow pushing down such filters)
+ * @return True if index supports full-text search, False otherwise
+ */
+ boolean supportsFullTextSearch();
+
+ FunctionalIndexInfo getFunctionalInfo();
+
+ public RelOptCost getCost(IndexProperties indexProps, RelOptPlanner planner,
+ int numProjectedFields, GroupScan primaryGroupScan);
+
+ public PluginCost getPluginCostModel();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java
new file mode 100644
index 000000000..309083b1b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscover.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+public interface IndexDiscover {
+ IndexCollection getTableIndex(String tableName);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java
new file mode 100644
index 000000000..fde2a32d2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverBase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.exec.physical.base.AbstractDbGroupScan;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.calcite.rel.RelNode;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * IndexDiscoverBase is the layer to read index configurations of tables on storage plugins,
+ * then based on the properties it collected, get the StoragePlugin from StoragePluginRegistry,
+ * together with indexes information, build an IndexCollection
+ */
+public abstract class IndexDiscoverBase implements IndexDiscover {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IndexDiscoverBase.class);
+
+ private AbstractDbGroupScan scan; // group scan corresponding to the primary table
+ private RelNode scanRel; // physical scan rel corresponding to the primary table
+
+ public IndexDiscoverBase(AbstractDbGroupScan inScan, DrillScanRelBase inScanPrel) {
+ scan = inScan;
+ scanRel = inScanPrel;
+ }
+
+ public IndexDiscoverBase(AbstractDbGroupScan inScan, ScanPrel inScanPrel) {
+ scan = inScan;
+ scanRel = inScanPrel;
+ }
+
+ public AbstractDbGroupScan getOriginalScan() {
+ return scan;
+ }
+
+ public RelNode getOriginalScanRel() {
+ return scanRel;
+ }
+
+ public IndexCollection getTableIndex(String tableName, String storageName, Collection<DrillIndexDefinition> indexDefs ) {
+ Set<DrillIndexDescriptor> idxSet = new HashSet<>();
+ for (DrillIndexDefinition def : indexDefs) {
+ DrillIndexDescriptor indexDescriptor = new DrillIndexDescriptor(def);
+ materializeIndex(storageName, indexDescriptor);
+ }
+ return new DrillIndexCollection(getOriginalScanRel(), idxSet);
+ }
+
+ public void materializeIndex(String storageName, DrillIndexDescriptor index) {
+ index.setStorageName(storageName);
+ index.setDrillTable(buildDrillTable(index));
+ }
+
+ /**
+ * When there is storageName in IndexDescriptor, get a DrillTable instance based on the
+ * StorageName and other informaiton in idxDesc that helps identifying the table.
+ * @param idxDesc
+ * @return
+ */
+ public DrillTable getExternalDrillTable(IndexDescriptor idxDesc) {
+ //XX: get table object for this index, index storage plugin should provide interface to get the DrillTable object
+ return null;
+ }
+
+ /**
+ * Abstract function getDrillTable will be implemented the IndexDiscover within storage plugin(e.g. HBase, MaprDB)
+ * since the implementations of AbstractStoragePlugin, IndexDescriptor and DrillTable in that storage plugin may have
+ * the implement details.
+ * @param idxDesc
+
+ * @return
+ */
+ public DrillTable buildDrillTable(IndexDescriptor idxDesc) {
+ if(idxDesc.getIndexType() == IndexDescriptor.IndexType.EXTERNAL_SECONDARY_INDEX) {
+ return getExternalDrillTable(idxDesc);
+ }
+ else {
+ return getNativeDrillTable(idxDesc);
+ }
+ }
+
+ /**
+ * When it is native index(index provided by native storage plugin),
+ * the actual IndexDiscover should provide the implementation to get the DrillTable object of index,
+ * Otherwise, we call IndexDiscoverable interface exposed from external storage plugin's SchemaFactory
+ * to get the desired DrillTable.
+ * @param idxDesc
+ * @return
+ */
+ public abstract DrillTable getNativeDrillTable(IndexDescriptor idxDesc);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java
new file mode 100644
index 000000000..dbf5edc82
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexDiscoverable.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.exec.planner.logical.DrillTable;
+
+
+/**
+ * SchemaFactory of a storage plugin that can used to store index tables should expose this interface to allow
+ * IndexDiscovers discovering the index table without adding dependency to the storage plugin.
+ */
+public interface IndexDiscoverable {
+
+ /**
+ * return the found DrillTable with path (e.g. names={"elasticsearch", "staffidx", "stjson"})
+ * @param discover
+ * @param desc
+ * @return
+ */
+ DrillTable findTable(IndexDiscover discover, DrillIndexDescriptor desc);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
new file mode 100644
index 000000000..ea34ea585
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexGroup.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Encapsulates one or more IndexProperties representing (non)covering or intersecting indexes. The encapsulated
+ * IndexProperties are used to rank the index in comparison with other IndexGroups.
+ */
+public class IndexGroup {
+ private List<IndexProperties> indexProps;
+
+ public IndexGroup() {
+ indexProps = Lists.newArrayList();
+ }
+
+ public boolean isIntersectIndex() {
+ if (indexProps.size() > 1) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public int numIndexes() {
+ return indexProps.size();
+ }
+
+ public void addIndexProp(IndexProperties prop) {
+ indexProps.add(prop);
+ }
+
+ public void addIndexProp(List<IndexProperties> prop) {
+ indexProps.addAll(prop);
+ }
+
+ public boolean removeIndexProp(IndexProperties prop) {
+ return indexProps.remove(prop);
+ }
+
+ public List<IndexProperties> getIndexProps() {
+ return indexProps;
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java
new file mode 100644
index 000000000..cfdd6d030
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexProperties.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+
+import java.util.Map;
+
+/**
+ * IndexProperties encapsulates the various metrics of a single index that are related to
+ * the current query. These metrics are subsequently used to rank the index in comparison
+ * with other indexes.
+ */
+public interface IndexProperties {
+
+ void setProperties(Map<LogicalExpression, RexNode> prefixMap,
+ boolean satisfiesCollation,
+ RexNode indexColumnsRemainderFilter,
+ Statistics stats);
+
+ double getLeadingSelectivity();
+
+ double getRemainderSelectivity();
+
+ boolean isCovering();
+
+ double getTotalRows();
+
+ IndexDescriptor getIndexDesc();
+
+ DrillScanRelBase getPrimaryTableScan();
+
+ RexNode getTotalRemainderFilter();
+
+ boolean satisfiesCollation();
+
+ void setSatisfiesCollation(boolean satisfiesCollation);
+
+ RelOptCost getSelfCost(RelOptPlanner planner);
+
+ int numLeadingFilters();
+
+ double getAvgRowSize();
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java
new file mode 100644
index 000000000..e71636973
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexStatistics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+
+
+import java.util.List;
+
+public interface IndexStatistics {
+ /** Returns the approximate number of rows in the table. */
+ double getRowCount();
+
+ /** Returns the collections of columns on which this table is sorted. */
+ List<RelCollation> getCollations();
+
+ /** Returns the distribution of the data in query result table. */
+ RelDistribution getDistribution();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java
new file mode 100644
index 000000000..a1a6fc882
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/IndexableExprMarker.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The filter expressions that could be indexed
+ * Other than SchemaPaths, which represent columns of a table and could be indexed,
+ * we consider only function expressions, and specifically, CAST function.
+ * To judge if an expression is indexable, we check these:
+ * 1, this expression should be one operand of a comparison operator, one of SqlKind.COMPARISON:
+ * IN, EQUALS, NOT_EQUALS, LESS_THAN, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN_OR_EQUAL
+ * 2, the expression tree should contain at least one inputRef (which means this expression is a
+ * computation on top of at least one column), and if we have more than one indexable expressions
+ * are found from operands of comparison operator, we should not take any expression as indexable.
+ *
+ * 3, (LIMIT to one level function) the expression is a function call, and no nested function call underneath, except ITEM
+ * 4, (LIMIT to CAST), the function call is a CAST
+ */
+public class IndexableExprMarker extends RexVisitorImpl<Boolean> {
+
+ //map of rexNode->converted LogicalExpression
+ final Map<RexNode, LogicalExpression> desiredExpressions = Maps.newHashMap();
+
+ //the expressions in equality comparison
+ final Map<RexNode, LogicalExpression> equalityExpressions = Maps.newHashMap();
+
+ //the expression found in non-equality comparison
+ final Map<RexNode, LogicalExpression> notInEquality = Maps.newHashMap();
+
+ //for =(cast(a.b as VARCHAR(len)), 'abcd'), if the 'len' is less than the max length of casted field on index table,
+ // we want to rewrite it to LIKE(cast(a.b as VARCHAR(len)), 'abcd%')
+ //map equalOnCastChar: key is the equal operator, value is the operand (cast(a.b as VARCHAR(10)),
+ final Map<RexNode, LogicalExpression> equalOnCastChar = Maps.newHashMap();
+
+ final private RelNode inputRel;
+
+ //flag current recursive call state: whether we are on a direct operand of comparison operator
+ boolean directCompareOp = false;
+
+ RexCall contextCall = null;
+
+ DrillParseContext parserContext;
+
+ public IndexableExprMarker(RelNode inputRel) {
+ super(true);
+ this.inputRel = inputRel;
+ parserContext = new DrillParseContext(PrelUtil.getPlannerSettings(inputRel.getCluster()));
+ }
+
+ public Map<RexNode, LogicalExpression> getIndexableExpression() {
+ return ImmutableMap.copyOf(desiredExpressions);
+ }
+
+ public Map<RexNode, LogicalExpression> getEqualOnCastChar() {
+ return ImmutableMap.copyOf(equalOnCastChar);
+ }
+
+ /**
+ * return the expressions that were only in equality condition _and_ only once. ( a.b = 'value' )
+ * @return
+ */
+ public Set<LogicalExpression> getExpressionsOnlyInEquality() {
+
+ Set<LogicalExpression> onlyInEquality = Sets.newHashSet();
+
+ Set<LogicalExpression> notInEqSet = Sets.newHashSet();
+
+ Set<LogicalExpression> inEqMoreThanOnce = Sets.newHashSet();
+
+ notInEqSet.addAll(notInEquality.values());
+
+ for (LogicalExpression expr : equalityExpressions.values()) {
+ //only process expr that is not in any non-equality condition(!notInEqSet.contains)
+ if (!notInEqSet.contains(expr)) {
+
+ //expr appear in two and more equality conditions should be ignored too
+ if (inEqMoreThanOnce.contains(expr)) {
+ continue;
+ }
+
+ //we already have recorded this expr in equality condition, move it to inEqMoreThanOnce
+ if (onlyInEquality.contains(expr)) {
+ inEqMoreThanOnce.add(expr);
+ onlyInEquality.remove(expr);
+ continue;
+ }
+
+ //finally we could take this expr
+ onlyInEquality.add(expr);
+ }
+ }
+ return onlyInEquality;
+ }
+
+ @Override
+ public Boolean visitInputRef(RexInputRef rexInputRef) {
+ return directCompareOp;
+ }
+
+ public boolean containInputRef(RexNode rex) {
+ if (rex instanceof RexInputRef) {
+ return true;
+ }
+ if ((rex instanceof RexCall) && "ITEM".equals(((RexCall)rex).getOperator().getName())) {
+ return true;
+ }
+ //TODO: use a visitor search recursively for inputRef, if found one return true
+ return false;
+ }
+
+ public boolean operandsAreIndexable(RexCall call) {
+ SqlKind kind = call.getKind();
+ boolean kindIsRight = (SqlKind.COMPARISON.contains(kind) || kind==SqlKind.LIKE || kind == SqlKind.SIMILAR);
+
+ if (!kindIsRight) {
+ return false;
+ }
+
+ int inputReference = 0;
+ for (RexNode operand : call.operands) {
+ //if for this operator, there are two operands and more have inputRef, which means it is something like:
+ // a.b = a.c, instead of a.b ='hello', so this cannot apply index
+ if (containInputRef(operand)) {
+ inputReference++;
+ if(inputReference>=2) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Boolean visitCall(RexCall call) {
+ if (call.getKind() == SqlKind.NOT) {
+ // Conditions under NOT are not indexable
+ return false;
+ }
+ if (operandsAreIndexable(call)) {
+ for (RexNode operand : call.operands) {
+ directCompareOp = true;
+ contextCall = call;
+ boolean markIt = operand.accept(this);
+ directCompareOp = false;
+ contextCall = null;
+ if (markIt) {
+ LogicalExpression expr = DrillOptiq.toDrill(parserContext, inputRel, operand);
+ desiredExpressions.put(operand, expr);
+ if (call.getKind() == SqlKind.EQUALS) {
+ equalityExpressions.put(operand, expr);
+ }
+ else {
+ notInEquality.put(operand, expr);
+ }
+ }
+ }
+ return false;
+ }
+
+ //now we are handling a call directly under comparison e.g. <([call], literal)
+ if (directCompareOp) {
+ // if it is an item, or CAST function
+ if ("ITEM".equals(call.getOperator().getName())) {
+ return directCompareOp;
+ }
+ else if (call.getKind() == SqlKind.CAST) {
+ //For now, we care only direct CAST: CAST's operand is a field(schemaPath),
+ // either ITEM call(nested name) or inputRef
+
+ //cast as char/varchar in equals function
+ if(contextCall != null && contextCall.getKind() == SqlKind.EQUALS
+ && (call.getType().getSqlTypeName()== SqlTypeName.CHAR
+ || call.getType().getSqlTypeName()==SqlTypeName.VARCHAR)) {
+ equalOnCastChar.put(contextCall, DrillOptiq.toDrill(parserContext, inputRel, call));
+ }
+
+ RexNode castOp = call.operands.get(0);
+ if (castOp instanceof RexInputRef) {
+ return true;
+ }
+ if ((castOp instanceof RexCall) && ("ITEM".equals(((RexCall)castOp).getOperator().getName()))) {
+ return true;
+ }
+ }
+ }
+
+ for (RexNode operand : call.operands) {
+ boolean bret = operand.accept(this);
+ }
+ return false;
+ }
+
+ public Boolean visitLocalRef(RexLocalRef localRef) {
+ return false;
+ }
+
+ public Boolean visitLiteral(RexLiteral literal) {
+ return false;
+ }
+
+ public Boolean visitOver(RexOver over) {
+ return false;
+ }
+
+ public Boolean visitCorrelVariable(RexCorrelVariable correlVariable) {
+ return false;
+ }
+
+ public Boolean visitDynamicParam(RexDynamicParam dynamicParam) {
+ return false;
+ }
+
+ public Boolean visitRangeRef(RexRangeRef rangeRef) {
+ return false;
+ }
+
+ public Boolean visitFieldAccess(RexFieldAccess fieldAccess) {
+ final RexNode expr = fieldAccess.getReferenceExpr();
+ return expr.accept(this);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
new file mode 100644
index 000000000..c17d09f00
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/InvalidIndexDefinitionException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class InvalidIndexDefinitionException extends DrillRuntimeException {
+ public InvalidIndexDefinitionException(String message) {
+ super(message);
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java
new file mode 100644
index 000000000..2859102e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/Statistics.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+
+public interface Statistics {
+
+ double ROWCOUNT_UNKNOWN = -1;
+ //HUGE is same as DrillCostBase.HUGE
+ double ROWCOUNT_HUGE = Double.MAX_VALUE;
+ double AVG_ROWSIZE_UNKNOWN = -1;
+ long AVG_COLUMN_SIZE = 10;
+
+ /** Returns whether statistics are available. Should be called prior to using the statistics
+ */
+ boolean isStatsAvailable();
+
+ /** Returns a unique index identifier
+ * @param idx - Index specified as a {@link IndexDescriptor}
+ * @return The unique index identifier
+ */
+ String buildUniqueIndexIdentifier(IndexDescriptor idx);
+
+ /** Returns the rowcount for the specified filter condition
+ * @param condition - Filter specified as a {@link RexNode}
+ * @param tabIdxName - The index name generated using {@code buildUniqueIndexIdentifier}
+ * @param scanRel - The current scan rel
+ * @return the rowcount for the filter
+ */
+ double getRowCount(RexNode condition, String tabIdxName, RelNode scanRel);
+
+ /** Returns the leading rowcount for the specified filter condition
+ * Leading rowcount means rowcount for filter condition only on leading index columns.
+ * @param condition - Filter specified as a {@link RexNode}
+ * @param tabIdxName - The index name generated using {@code buildUniqueIndexIdentifier}
+ * @param scanRel - The current scan rel
+ * @return the leading rowcount
+ */
+ double getLeadingRowCount(RexNode condition, String tabIdxName, RelNode scanRel);
+
+ /** Returns the average row size for the specified filter condition
+ * @param tabIdxName - The index name generated using {@code buildUniqueIndexIdentifier}
+ * @param isIndexScan - Whether the current rel is an index scan (false for primary table)
+ */
+ double getAvgRowSize(String tabIdxName, boolean isIndexScan);
+
+ boolean initialize(RexNode condition, DrillScanRelBase scanRel, IndexCallContext context);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java
new file mode 100644
index 000000000..6894e4fdb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/index/StatisticsPayload.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.index;
+
+public interface StatisticsPayload {
+ double getRowCount();
+ double getLeadingRowCount();
+ double getAvgRowSize();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 53036f1f4..ed9b32fe2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -91,6 +91,10 @@ public abstract class DrillTable implements Table {
this.options = options;
}
+ public void setGroupScan(GroupScan scan) {
+ this.scan = scan;
+ }
+
public GroupScan getGroupScan() throws IOException{
if (scan == null) {
if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java
new file mode 100644
index 000000000..754c5d753
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PartitionFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.record.VectorWrapper;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
+public interface PartitionFunction {
+
+ /**
+ * Return the list of FieldReferences that participate in the partitioning function
+ * @return list of FieldReferences
+ */
+ List<FieldReference> getPartitionRefList();
+
+ /**
+ * Setup method for the partitioning function
+ * @param partitionKeys a list of partition columns on which range partitioning is needed
+ */
+ void setup(List<VectorWrapper<?>> partitionKeys);
+
+ /**
+ * Evaluate a partitioning function for a particular row index and return the partition id
+ * @param index the integer index into the partition keys vector for a specific 'row' of values
+ * @param numPartitions the max number of partitions that are allowed
+ * @return partition id, an integer value
+ */
+ int eval(int index, int numPartitions);
+
+ /**
+ * Returns a FieldReference (LogicalExpression) for the partition function
+ * @return FieldReference for the partition function
+ */
+ FieldReference getPartitionFieldRef();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index eb6112dfc..cb790918a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -80,7 +80,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
}
- protected static enum BatchState {
+ public static enum BatchState {
/** Need to build schema and return. */
BUILD_SCHEMA,
/** This is still the first data batch. */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 9314da678..1bbbe76b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -101,6 +101,11 @@ public abstract class AbstractRecordReader implements RecordReader {
}
}
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
protected List<SchemaPath> getDefaultColumnsToRead() {
return GroupScan.ALL_COLUMNS;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index edd91d157..33b361c7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -43,6 +43,14 @@ public interface RecordReader extends AutoCloseable {
void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException;
/**
+ * Check if the reader may have potentially more data to be read in subsequent iterations. Certain types of readers
+ * such as repeatable readers can be invoked multiple times, so this method will allow ScanBatch to check with
+ * the reader before closing it.
+ * @return return true if there could potentially be more reads, false otherwise
+ */
+ boolean hasNext();
+
+ /**
* Increments this record reader forward, writing via the provided output
* mutator into the output batch.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java
new file mode 100644
index 000000000..5f9eef801
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/EncodedSchemaPathSet.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+
+import org.apache.drill.shaded.guava.com.google.common.io.BaseEncoding;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This class provided utility methods to encode and decode a set of user specified
+ * SchemaPaths to a set of encoded SchemaPaths with the following properties.
+ * <ol>
+ * <li>Valid Drill identifier as per its grammar with only one, root name segment.
+ * <li>A single identifier can not exceed 1024 characters in length.
+ * </ol>
+ * <p>
+ * Format of the encoded SchemaPath:
+ * <blockquote><pre>$$ENC\d\dlt;base32 encoded input paths&gt;</pre></blockquote>
+ * <p>
+ * We use Base-32 over Base-64 because the later's charset includes '\' and '+'.
+ */
+public class EncodedSchemaPathSet {
+
+ private static final int ESTIMATED_ENCODED_SIZE = 1024;
+
+ private static final String ENC_PREFIX = "$$ENC";
+
+ private static final String ENC_FORMAT_STRING = ENC_PREFIX + "%02d%s";
+ private static final int ENC_PREFIX_SIZE = ENC_PREFIX.length() + "00".length();
+ private static final int MAX_ENC_IDENTIFIER_SIZE = (PlannerSettings.DEFAULT_IDENTIFIER_MAX_LENGTH - ENC_PREFIX_SIZE);
+ private static final int MAX_ENC_IDENTIFIER_COUNT = 100; // "$$ENC00*...$$ENC99*"
+
+ private static final BaseEncoding CODEC = BaseEncoding.base32().omitPadding(); // no-padding version
+
+ public static final String ENCODED_STAR_COLUMN = encode("*")[0];
+
+ /*
+ * Performance of various methods of encoding a Java String to UTF-8 keeps changing
+ * between releases, hence we'll encapsulate the actual methods within these functions
+ * and use them everywhere in Drill
+ */
+ private static final String UTF_8 = "utf-8";
+
+
+ private static byte[] encodeUTF(String input) {
+ try {
+ return input.getBytes(UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new DrillRuntimeException(e); // should never come to this
+ }
+ }
+
+ private static String decodeUTF(byte[] input) {
+ try {
+ return new String(input, UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new DrillRuntimeException(e); // should never come to this
+ }
+ }
+
+ private static String decodeUTF(byte[] input, int offset, int length) {
+ try {
+ return new String(input, offset, length, UTF_8);
+ } catch (UnsupportedEncodingException e) {
+ throw new DrillRuntimeException(e); // should never come to this
+ }
+ }
+
+ /**
+ * Returns the encoded array of SchemaPath identifiers from the input array of SchemaPath.
+ * <p>
+ * The returned identifiers have the following properties:
+ * <ul>
+ * <li>Each SchemaPath identifier in the array has only one single root NameSegment.</li>
+ * <li>Maximum length of each such identifier is equal to the maximum length of Drill identifier (currently 1024).</li>
+ * </ul>
+ * <p>
+ * We take advantage of the fact that Java's modified utf-8 encoding can never contain
+ * embedded null byte.
+ * @see <a>http://docs.oracle.com/javase/8/docs/api/java/io/DataInput.html#modified-utf-8</a>
+ */
+ public static String[] encode(final String... schemaPaths) {
+ Preconditions.checkArgument(schemaPaths != null && schemaPaths.length > 0,
+ "At least one schema path should be provided");
+
+ NoCopyByteArrayOutputStream out = new NoCopyByteArrayOutputStream(ESTIMATED_ENCODED_SIZE);
+ int bufOffset = 1; // 1st byte is NULL
+ for (String schemaPath : schemaPaths) {
+ out.write(0);
+ out.write(encodeUTF(schemaPath));
+ }
+ out.close();
+
+ final int bufLen = out.size() - 1; // not counting the first NULL byte
+ String encodedStr = CODEC.encode(out.getBuffer(), bufOffset, bufLen);
+ assert !encodedStr.endsWith("=") : String.format("Encoded string '%s' ends with '='", encodedStr);
+ return splitIdentifiers(encodedStr);
+ }
+
+ public static boolean isEncodedSchemaPath(SchemaPath schemaPath) {
+ return schemaPath != null && isEncodedSchemaPath(schemaPath.getRootSegment().getNameSegment().getPath());
+ }
+
+ public static boolean isEncodedSchemaPath(String schemaPath) {
+ return schemaPath != null && schemaPath.startsWith(ENC_PREFIX);
+ }
+
+ /**
+ * Returns the decoded Collection of SchemaPath from the input which
+ * may contain a mix of encoded and non-encoded SchemaPaths.
+ * <p>
+ * The size of returned Collection is always equal to or greater than the
+ * input array.
+ * <p>
+ * The non-encoded SchemaPaths are collated in the beginning to the returned
+ * array, in the same order as that of the input array.
+ */
+ public static Collection<SchemaPath> decode(final Collection<SchemaPath> encodedPaths) {
+ String[] schemaPathStrings = new String[encodedPaths.size()];
+ Iterator<SchemaPath> encodedPathsItr = encodedPaths.iterator();
+ for (int i = 0; i < schemaPathStrings.length; i++) {
+ SchemaPath schemaPath = encodedPathsItr.next();
+ if (schemaPath.getRootSegmentPath().startsWith(ENC_PREFIX)) {
+ // encoded schema path contains only root segment
+ schemaPathStrings[i] = schemaPath.getRootSegmentPath();
+ } else {
+ schemaPathStrings[i] = schemaPath.toExpr();
+ }
+ }
+ String[] decodedStrings = decode(schemaPathStrings);
+ if (decodedStrings == schemaPathStrings) {
+ return encodedPaths; // return the original collection as no encoded SchemaPath was found
+ } else {
+ ImmutableList.Builder<SchemaPath> builder = new ImmutableList.Builder<>();
+ for (String decodedString : decodedStrings) {
+ if ("*".equals(decodedString) || "`*`".equals(decodedString)) {
+ builder.add(SchemaPath.STAR_COLUMN);
+ } else {
+ builder.add(SchemaPath.parseFromString(decodedString));
+ }
+ }
+ return builder.build();
+ }
+ }
+
+ /**
+ * Returns the decoded array of SchemaPath strings from the input which
+ * may contain a mix of encoded and non-encoded SchemaPaths.
+ * <p>
+ * The size of returned array is always equal to or greater than the
+ * input array.
+ * <p>
+ * The non-encoded SchemaPaths are collated in the beginning to the returned
+ * array, in the same order as that of the input array.
+ */
+ public static String[] decode(final String... encodedPaths) {
+ Preconditions.checkArgument(encodedPaths != null && encodedPaths.length > 0,
+ "At least one encoded path should be provided");
+
+ StringBuilder sb = new StringBuilder(ESTIMATED_ENCODED_SIZE);
+
+ // As the encoded schema path move across components, they could get reordered.
+ // Sorting ensures that the original order is restored before concatenating the
+ // components back to the full encoded String.
+ Arrays.sort(encodedPaths);
+
+ List<String> decodedPathList = Lists.newArrayList();
+ for (String encodedPath : encodedPaths) {
+ if (encodedPath.startsWith(ENC_PREFIX)) {
+ sb.append(encodedPath, ENC_PREFIX_SIZE, encodedPath.length());
+ } else {
+ decodedPathList.add(encodedPath);
+ }
+ }
+
+ if (sb.length() > 0) {
+ byte[] decodedBytes;
+ try {
+ decodedBytes = CODEC.decode(sb);
+ } catch (IllegalArgumentException e) {
+ throw new DrillRuntimeException(String.format(
+ "Unable to decode the input strings as encoded schema paths:\n%s", Arrays.asList(encodedPaths)), e);
+ }
+
+ int start = 0, index = 0;
+ for (; index < decodedBytes.length; index++) {
+ if (decodedBytes[index] == 0 && index - start > 0) {
+ decodedPathList.add(decodeUTF(decodedBytes, start, index-start));
+ start = index + 1;
+ }
+ }
+ if (index - start > 0) {
+ String lastSchemaPath = decodeUTF(decodedBytes, start, index-start).trim();
+ if (!lastSchemaPath.isEmpty()) {
+ decodedPathList.add(lastSchemaPath);
+ }
+ }
+ return decodedPathList.toArray(new String[decodedPathList.size()]);
+ } else {
+ // original list did not have any encoded path, return as is
+ return encodedPaths;
+ }
+ }
+
+ /**
+ * Splits the input string so that the length of each encoded string,
+ * including the signature prefix is less than or equal to MAX_DRILL_IDENTIFIER_SIZE.
+ */
+ private static String[] splitIdentifiers(String input) {
+ if (input.length() < MAX_ENC_IDENTIFIER_SIZE) {
+ return new String[] { String.format(ENC_FORMAT_STRING, 0, input) };
+ }
+ int splitsCount = (int) Math.ceil(input.length() / (double)MAX_ENC_IDENTIFIER_SIZE);
+ if (splitsCount > MAX_ENC_IDENTIFIER_COUNT) {
+ throw new DrillRuntimeException(String.format(
+ "Encoded size of the SchemaPath identifier '%s' exceeded maximum value.", input));
+ }
+ String[] result = new String[splitsCount];
+ for (int i = 0, startIdx = 0; i < result.length; i++, startIdx += MAX_ENC_IDENTIFIER_SIZE) {
+ // TODO: see if we can avoid memcpy due to input.substring() call
+ result[i] = String.format(ENC_FORMAT_STRING, i, input.substring(startIdx, Math.min(input.length(), startIdx + MAX_ENC_IDENTIFIER_SIZE)));
+ }
+ return result;
+ }
+
+ /**
+ * Optimized version of Java's ByteArrayOutputStream which returns the underlying
+ * byte array instead of making a copy
+ */
+ private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream {
+ public NoCopyByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public byte[] getBuffer() {
+ return buf;
+ }
+
+ public int size() {
+ return count;
+ }
+
+ @Override
+ public void write(int b) {
+ super.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) {
+ super.write(b, 0, b.length);
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e); // should never come to this
+ }
+ }
+ }
+
+}