aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/pom.xml14
-rw-r--r--exec/java-exec/src/main/codegen/includes/vv_imports.ftl3
-rw-r--r--exec/java-exec/src/main/codegen/templates/FixedValueVectors.java4
-rw-r--r--exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java5
-rw-r--r--exec/java-exec/src/main/codegen/templates/SqlAccessors.java117
-rw-r--r--exec/java-exec/src/main/codegen/templates/TypeHelper.java21
-rw-r--r--exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java112
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java108
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java165
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java59
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java200
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java98
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java92
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java100
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java107
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java41
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java142
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java217
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java74
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java96
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java123
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java223
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java117
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java167
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java123
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java67
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java131
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java11
-rw-r--r--exec/java-exec/src/main/resources/storage-engines.json29
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java7
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java22
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java22
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java17
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java3
92 files changed, 4134 insertions, 251 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 9ea8304ca..a016afc2e 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -44,12 +44,10 @@
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
- <version>2.9.1</version>
</dependency>
<dependency>
<groupId>xalan</groupId>
<artifactId>xalan</artifactId>
- <version>2.7.1</version>
</dependency>
<dependency>
<groupId>com.sun.codemodel</groupId>
@@ -58,12 +56,12 @@
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
- <artifactId>commons-compiler-jdk</artifactId>
- <version>2.6.1</version>
+ <artifactId>janino</artifactId>
+ <version>2.7.3</version>
</dependency>
<dependency>
<groupId>net.hydromatic</groupId>
- <artifactId>optiq</artifactId>
+ <artifactId>optiq-core</artifactId>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
@@ -139,8 +137,8 @@
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
- <version>4.0.7.Final</version>
- <artifactId>drill-netty-bufferl</artifactId>
+ <version>${project.version}</version>
+ <artifactId>drill-buffers</artifactId>
</dependency>
<dependency>
<groupId>org.apache.drill</groupId>
@@ -196,7 +194,7 @@
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
- <version>3.1</version>
+ <version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index 3e5a156f7..5083f74c8 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -18,6 +18,7 @@ import com.google.common.base.Charsets;
import com.google.common.collect.ObjectArrays;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
import org.apache.commons.lang3.ArrayUtils;
@@ -39,6 +40,8 @@ import java.util.Random;
import java.util.List;
import java.io.Closeable;
+import java.io.InputStream;
+import java.io.InputStreamReader;
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index c357dd6b9..2d81299ad 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -155,6 +155,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
return valueCount;
}
+ public boolean isNull(int index){
+ return false;
+ }
+
<#if (type.width > 8)>
public ${minor.javaType!type.javaType} get(int index) {
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 127c6fd48..4677374a7 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -243,7 +243,12 @@ package org.apache.drill.exec.vector;
</#if> get(int index, int positionIndex) {
return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
}
+
+ public boolean isNull(int index){
+ return false;
+ }
+
public void get(int index, Repeated${minor.class}Holder holder){
holder.start = offsets.getAccessor().get(index);
holder.end = offsets.getAccessor().get(index+1);
diff --git a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
new file mode 100644
index 000000000..efaf9a66d
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+<@pp.dropOutputFile />
+<#list vv.types as type>
+<#list type.minor as minor>
+<#list ["", "Nullable"] as mode>
+<#assign name = mode + minor.class?cap_first />
+<#assign javaType = (minor.javaType!type.javaType) />
+<@pp.changeOutputFile name="/org/apache/drill/exec/vector/accessor/${name}Accessor.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.vector.accessor;
+
+<#include "/@includes/vv_imports.ftl" />
+
+@SuppressWarnings("unused")
+public class ${name}Accessor extends AbstractSqlAccessor{
+ <#if mode == "Nullable">
+ private static final MajorType TYPE = Types.optional(MinorType.${minor.class?upper_case});
+ <#else>
+ private static final MajorType TYPE = Types.required(MinorType.${minor.class?upper_case});
+ </#if>
+
+ private final ${name}Vector.Accessor ac;
+
+ public ${name}Accessor(${name}Vector vector){
+ this.ac = vector.getAccessor();
+ }
+
+ public Object getObject(int index){
+ return ac.getObject(index);
+ }
+
+ <#if type.major == "VarLen">
+
+ @Override
+ public InputStream getStream(int index){
+ ${name}Holder h = new ${name}Holder();
+ ac.get(index, h);
+ return new ByteBufInputStream(h.buffer.slice(h.start, h.end));
+ }
+
+ @Override
+ public byte[] getBytes(int index){
+ return ac.get(index);
+ }
+
+ <#switch minor.class>
+ <#case "VarBinary">
+ <#break>
+ <#case "VarChar">
+ @Override
+ public InputStreamReader getReader(int index){
+ return new InputStreamReader(getStream(index), Charsets.UTF_8);
+ }
+
+ @Override
+ public String getString(int index){
+ return new String(getBytes(index), Charsets.UTF_8);
+ }
+
+
+ <#break>
+ <#case "Var16Char">
+ @Override
+ public InputStreamReader getReader(int index){
+ return new InputStreamReader(getStream(index), Charsets.UTF_16);
+ }
+
+ @Override
+ public String getString(int index){
+ return new String(getBytes(index), Charsets.UTF_16);
+ }
+
+
+ <#break>
+ <#default>
+ This is uncompilable code
+ </#switch>
+
+ <#else>
+ @Override
+ public ${javaType} get${javaType?cap_first}(int index){
+ return ac.get(index);
+ }
+ </#if>
+
+ @Override
+ public boolean isNull(int index){
+ return false;
+ }
+
+ @Override
+ MajorType getType(){return TYPE;};
+
+}
+
+
+</#list>
+</#list>
+</#list> \ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 783f9435d..8c72cf76b 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -23,7 +23,7 @@
package org.apache.drill.exec.expr;
<#include "/@includes/vv_imports.ftl" />
-
+import org.apache.drill.exec.vector.accessor.*;
public class TypeHelper {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);
@@ -47,6 +47,25 @@ public class TypeHelper {
throw new UnsupportedOperationException();
}
+ public static SqlAccessor getSqlAccessor(ValueVector vector){
+ switch(vector.getField().getType().getMinorType()){
+ <#list vv.types as type>
+ <#list type.minor as minor>
+ case ${minor.class?upper_case}:
+ switch (vector.getField().getType().getMode()) {
+ case REQUIRED:
+ return new ${minor.class}Accessor((${minor.class}Vector) vector);
+ case OPTIONAL:
+ return new Nullable${minor.class}Accessor((Nullable${minor.class}Vector) vector);
+ case REPEATED:
+ throw new UnsupportedOperationException();
+ }
+ </#list>
+ </#list>
+ }
+ throw new UnsupportedOperationException();
+ }
+
public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
switch (type) {
<#list vv.types as type>
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index b059d89fa..5cd83afec 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -225,14 +225,35 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
holder.buffer = data;
}
+
+ <#switch minor.class>
+ <#case "VarChar">
+ public Object getObject(int index) {
+ return new String(get(index), Charsets.UTF_8);
+ }
+ <#break>
+ <#case "Var16Char">
+ public Object getObject(int index) {
+ return new String(get(index), Charsets.UTF_16);
+ }
+ <#break>
+ <#default>
public Object getObject(int index) {
return get(index);
}
+
+ </#switch>
+
+
public int getValueCount() {
return valueCount;
}
+ public boolean isNull(int index){
+ return false;
+ }
+
public UInt${type.width}Vector getOffsetVector(){
return offsetVector;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 2f0ab2ea4..5f370ff34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -35,6 +35,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.DuplicateInstanceNameException;
import com.hazelcast.core.Hazelcast;
@@ -132,6 +133,10 @@ public class HazelCache implements DistributedCache {
@Override
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
IMap<String, V> imap = this.instance.getMap(clazz.toString());
+ MapConfig myMapConfig = new MapConfig();
+ myMapConfig.setBackupCount(0);
+ myMapConfig.setReadBackupData(true);
+ instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
return new HCDistributedMapImpl<V>(imap, clazz);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 74eed9424..1297cb343 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.Closeable;
@@ -37,11 +35,18 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.rpc.*;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.ChannelClosedException;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
@@ -52,7 +57,7 @@ import com.google.common.util.concurrent.SettableFuture;
/**
* Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
*/
-public class DrillClient implements Closeable{
+public class DrillClient implements Closeable, ConnectionThrottle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
DrillConfig config;
@@ -86,17 +91,28 @@ public class DrillClient implements Closeable{
return config;
}
+ @Override
+ public void setAutoRead(boolean enableAutoRead) {
+ client.setAutoRead(enableAutoRead);
+ }
+
+
+
/**
* Connects the client to a Drillbit server
*
* @throws IOException
*/
- public synchronized void connect() throws RpcException {
+ public void connect() throws RpcException {
+ connect((String) null);
+ }
+
+ public synchronized void connect(String connect) throws RpcException {
if (connected) return;
if (clusterCoordinator == null) {
try {
- this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+ this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
this.clusterCoordinator.start(10000);
} catch (Exception e) {
throw new RpcException("Failure setting up ZK for client.", e);
@@ -172,6 +188,11 @@ public class DrillClient implements Closeable{
return listener.getResults();
}
+ public void cancelQuery(QueryId id){
+ client.send(RpcType.CANCEL_QUERY, id, Ack.class);
+ }
+
+
/**
* Submits a Logical plan for direct execution (bypasses parsing)
*
@@ -217,7 +238,7 @@ public class DrillClient implements Closeable{
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
// logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
results.add(result);
if(result.getHeader().getIsLastChunk()){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 7adefdb10..b07b3aee7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -17,43 +17,33 @@
*/
package org.apache.drill.exec.client;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.base.Stopwatch;
-import com.google.common.io.Resources;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.StringUtils;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.RemoteRpcException;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.util.VectorUtil;
-import org.apache.drill.exec.vector.ValueVector;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
public class QuerySubmitter {
@@ -99,42 +89,50 @@ public class QuerySubmitter {
public int submitQuery(String planLocation, String type, String zkQuorum, boolean local, int bits) throws Exception {
DrillConfig config = DrillConfig.create();
- DrillClient client;
- if (local) {
- RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
- Drillbit[] drillbits = new Drillbit[bits];
- for (int i = 0; i < bits; i++) {
- drillbits[i] = new Drillbit(config, serviceSet);
- drillbits[i].run();
+ DrillClient client = null;
+
+ try{
+ if (local) {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit[] drillbits = new Drillbit[bits];
+ for (int i = 0; i < bits; i++) {
+ drillbits[i] = new Drillbit(config, serviceSet);
+ drillbits[i].run();
+ }
+ client = new DrillClient(config, serviceSet.getCoordinator());
+ } else {
+ ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum);
+ clusterCoordinator.start(10000);
+ client = new DrillClient(config, clusterCoordinator);
}
- client = new DrillClient(config, serviceSet.getCoordinator());
- } else {
- ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum);
- clusterCoordinator.start(10000);
- client = new DrillClient(config, clusterCoordinator);
- }
- client.connect();
- QueryResultsListener listener = new QueryResultsListener();
- String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
- UserProtos.QueryType queryType;
- type = type.toLowerCase();
- switch(type) {
- case "logical":
- queryType = UserProtos.QueryType.LOGICAL;
- break;
- case "physical":
- queryType = UserProtos.QueryType.PHYSICAL;
- break;
- default:
- System.out.println("Invalid query type: " + type);
- return -1;
+ client.connect();
+ QueryResultsListener listener = new QueryResultsListener();
+ String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
+ UserProtos.QueryType queryType;
+ type = type.toLowerCase();
+ switch(type) {
+ case "sql":
+ queryType = UserProtos.QueryType.SQL;
+ break;
+ case "logical":
+ queryType = UserProtos.QueryType.LOGICAL;
+ break;
+ case "physical":
+ queryType = UserProtos.QueryType.PHYSICAL;
+ break;
+ default:
+ System.out.println("Invalid query type: " + type);
+ return -1;
+ }
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ client.runQuery(queryType, plan, listener);
+ int rows = listener.await();
+ System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000));
+ return 0;
+ }finally{
+ if(client != null) client.close();
}
- Stopwatch watch = new Stopwatch();
- watch.start();
- client.runQuery(queryType, plan, listener);
- int rows = listener.await();
- System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000));
- return 0;
}
private class QueryResultsListener implements UserResultsListener {
@@ -150,7 +148,7 @@ public class QuerySubmitter {
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount();
if (result.getData() != null) {
count.addAndGet(rows);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
index 904afb5c4..080679b87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
@@ -112,10 +112,7 @@ class MergeAdapter extends ClassVisitor {
return null;
}
if(arg3 != null){
- System.out.println("a: " + arg3);
arg3 = arg3.replace(set.precompiled.slash, set.generated.slash);
- System.out.println("b: " + arg3);
-
}
// if( (access & Modifier.PUBLIC) == 0){
// access = access ^ Modifier.PUBLIC ^ Modifier.PROTECTED | Modifier.PRIVATE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 36433ad29..cbd722c03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -24,16 +24,18 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.record.NullExpression;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
import com.google.common.collect.Lists;
-import org.apache.drill.exec.record.VectorAccessible;
-
public class ExpressionTreeMaterializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
@@ -45,7 +47,12 @@ public class ExpressionTreeMaterializer {
public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry) {
LogicalExpression materializedExpr = expr.accept(new MaterializeVisitor(batch, errorCollector), null);
- return ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry);
+ LogicalExpression out = ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry);
+ if(out instanceof NullExpression){
+ return new TypedNullConstant(Types.optional(MinorType.INT));
+ }else{
+ return out;
+ }
}
private static class MaterializeVisitor extends SimpleExprVisitor<LogicalExpression> {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index fd965a704..80a78191d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import net.hydromatic.optiq.SchemaPlus;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
@@ -75,6 +77,7 @@ public class FragmentContext implements Closeable {
this.connection = connection;
this.fragment = fragment;
this.funcRegistry = funcRegistry;
+ logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
}
@@ -91,6 +94,10 @@ public class FragmentContext implements Closeable {
public DrillbitContext getDrillbitContext() {
return context;
}
+
+ public SchemaPlus getRootSchema(){
+ return null;
+ }
/**
* Get this node's identity.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 256e7729a..11658e63c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -21,9 +21,11 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FunctionRegistry;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.sql.DrillSchemaFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -81,4 +83,13 @@ public class QueryContext {
return workBus;
}
+ public DrillSchemaFactory getSchemaFactory(){
+ return drillbitContext.getSchemaFactory();
+ }
+
+ public FunctionRegistry getFunctionRegistry(){
+ return drillbitContext.getFunctionRegistry();
+
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 6e58ec778..cd59428f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
@@ -33,7 +32,6 @@ import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.common.logical.data.*;
-import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
@@ -50,6 +48,8 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.store.StorageEngine;
+import org.eigenbase.rel.RelFieldCollation.Direction;
+import org.eigenbase.rel.RelFieldCollation.NullDirection;
import com.beust.jcommander.internal.Lists;
@@ -123,9 +123,9 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException {
PhysicalOperator input = order.getInput().accept(this, value);
- List<OrderDef> ods = Lists.newArrayList();
+ List<Ordering> ods = Lists.newArrayList();
for(Ordering o : order.getOrderings()){
- ods.add(OrderDef.create(o));
+ ods.add(o);
}
return new SelectionVectorRemover(new Sort(input, ods, false));
}
@@ -150,13 +150,13 @@ public class BasicOptimizer extends Optimizer{
}
// a collapsing aggregate is a currently implemented as a sort followed by a streaming aggregate.
- List<OrderDef> orderDefs = Lists.newArrayList();
+ List<Ordering> orderDefs = Lists.newArrayList();
List<NamedExpression> keys = Lists.newArrayList();
for(LogicalExpression e : segment.getExprs()){
if( !(e instanceof SchemaPath)) throw new OptimizerException("The basic optimizer doesn't currently support collapsing aggregate where the segment value is something other than a SchemaPath.");
keys.add(new NamedExpression(e, new FieldReference((SchemaPath) e)));
- orderDefs.add(new OrderDef(Direction.ASC, e));
+ orderDefs.add(new Ordering(Direction.Ascending, e, NullDirection.FIRST));
}
Sort sort = new Sort(segment.getInput().accept(this, value), orderDefs, false);
@@ -169,22 +169,22 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException {
PhysicalOperator leftOp = join.getLeft().accept(this, value);
- List<OrderDef> leftOrderDefs = Lists.newArrayList();
+ List<Ordering> leftOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
- leftOrderDefs.add(new OrderDef(Direction.ASC, jc.getLeft()));
+ leftOrderDefs.add(new Ordering(Direction.Ascending, jc.getLeft()));
}
leftOp = new Sort(leftOp, leftOrderDefs, false);
leftOp = new SelectionVectorRemover(leftOp);
PhysicalOperator rightOp = join.getRight().accept(this, value);
- List<OrderDef> rightOrderDefs = Lists.newArrayList();
+ List<Ordering> rightOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
- rightOrderDefs.add(new OrderDef(Direction.ASC, jc.getRight()));
+ rightOrderDefs.add(new Ordering(Direction.Ascending, jc.getRight()));
}
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
- MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJointType());
+ MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
return new SelectionVectorRemover(mjp);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 76916ea44..fde88a9b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -21,13 +21,14 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.Join.JoinType;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.sql.SqlJoinOperator.JoinType;
import com.beust.jcommander.internal.Lists;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -44,7 +45,7 @@ public class MergeJoinPOP extends AbstractBase{
private final PhysicalOperator left;
private final PhysicalOperator right;
private final List<JoinCondition> conditions;
- private final Join.JoinType joinType;
+ private final JoinRelType joinType;
@Override
public OperatorCost getCost() {
@@ -56,7 +57,7 @@ public class MergeJoinPOP extends AbstractBase{
@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("join-conditions") List<JoinCondition> conditions,
- @JsonProperty("join-type") Join.JoinType joinType
+ @JsonProperty("join-type") JoinRelType joinType
) {
this.left = left;
this.right = right;
@@ -93,7 +94,7 @@ public class MergeJoinPOP extends AbstractBase{
return right;
}
- public Join.JoinType getJoinType() {
+ public JoinRelType getJoinType() {
return joinType;
}
@@ -102,12 +103,12 @@ public class MergeJoinPOP extends AbstractBase{
}
public MergeJoinPOP flipIfRight(){
- if(joinType == JoinType.RIGHT){
+ if(joinType == JoinRelType.RIGHT){
List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size());
for(JoinCondition c : conditions){
flippedConditions.add(c.flip());
}
- return new MergeJoinPOP(right, left, flippedConditions, JoinType.LEFT);
+ return new MergeJoinPOP(right, left, flippedConditions, JoinRelType.LEFT);
}else{
return this;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 667cc33a4..549c65c1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -17,18 +17,18 @@
*/
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
-import org.apache.drill.common.expression.LogicalExpression;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractReceiver;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
// The goal of this operator is to produce outgoing batches with records
// ordered according to the supplied expression. Each incoming batch
@@ -39,12 +39,12 @@ public class MergingReceiverPOP extends AbstractReceiver{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverPOP.class);
private final List<DrillbitEndpoint> senders;
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
@JsonCreator
public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("senders") List<DrillbitEndpoint> senders,
- @JsonProperty("orderings") List<OrderDef> orderings) {
+ @JsonProperty("orderings") List<Ordering> orderings) {
super(oppositeMajorFragmentId);
this.senders = senders;
this.orderings = orderings;
@@ -78,7 +78,7 @@ public class MergingReceiverPOP extends AbstractReceiver{
return new Size(1,1);
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index f0aa85bdc..c49509f40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -17,27 +17,27 @@
*/
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
@JsonTypeName("ordered-partition-exchange")
public class OrderedPartitionExchange extends AbstractExchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionExchange.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private final FieldReference ref;
private int recordsToSample = 10000; // How many records must be received before analyzing
private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache
@@ -48,7 +48,7 @@ public class OrderedPartitionExchange extends AbstractExchange {
private List<DrillbitEndpoint> receiverLocations;
@JsonCreator
- public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref,
+ public OrderedPartitionExchange(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref,
@JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample,
@JsonProperty("samplingFactor") Integer samplingFactor, @JsonProperty("completionFactor") Float completionFactor) {
super(child);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index de5cf04ef..55632a214 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -17,24 +17,27 @@
*/
package org.apache.drill.exec.physical.config;
-import com.beust.jcommander.internal.Lists;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("OrderedPartitionSender")
public class OrderedPartitionSender extends AbstractSender {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionSender.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private final FieldReference ref;
private final List<DrillbitEndpoint> endpoints;
private final int sendingWidth;
@@ -44,7 +47,7 @@ public class OrderedPartitionSender extends AbstractSender {
private float completionFactor;
@JsonCreator
- public OrderedPartitionSender(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
+ public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
@JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample,
@JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) {
@@ -70,7 +73,7 @@ public class OrderedPartitionSender extends AbstractSender {
return endpoints;
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index fbeb6dfb6..b8073c175 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -18,25 +18,25 @@
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.proto.CoordinationProtos;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("single-merge-exchange")
public class SingleMergeExchange extends AbstractExchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleMergeExchange.class);
- private final List<OrderDef> orderExpr;
+ private final List<Ordering> orderExpr;
// ephemeral for setup tasks
private List<CoordinationProtos.DrillbitEndpoint> senderLocations;
@@ -44,7 +44,7 @@ public class SingleMergeExchange extends AbstractExchange {
@JsonCreator
public SingleMergeExchange(@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("orderings") List<OrderDef> orderExpr) {
+ @JsonProperty("orderings") List<Ordering> orderExpr) {
super(child);
this.orderExpr = orderExpr;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 206e7cd4a..82aa830cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.config;
import java.util.List;
-import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -27,7 +27,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -35,17 +34,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class Sort extends AbstractSingle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private boolean reverse = false;
@JsonCreator
- public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
+ public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
super(child);
this.orderings = orderings;
this.reverse = reverse;
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index aae1a3c21..7c8a51cad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.record.VectorContainer;
+import org.eigenbase.rel.JoinRelType;
/**
* This join template uses a merge join to combine two ordered streams into a single larger batch. When joining
@@ -91,7 +92,7 @@ public abstract class JoinTemplate implements JoinWorker {
// validate input iterators (advancing to the next record batch if necessary)
if (!status.isRightPositionAllowed()) {
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
+ if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
@@ -109,7 +110,7 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
+ if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT)
if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 298b031f8..bd668e748 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.JoinRelType;
import com.google.common.collect.ImmutableList;
import com.sun.codemodel.JClass;
@@ -112,7 +113,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final RecordBatch right;
private final JoinStatus status;
private final JoinCondition condition;
- private final Join.JoinType joinType;
+ private final JoinRelType joinType;
private JoinWorker worker;
public MergeJoinBatchBuilder batchBuilder;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 9428b4650..3549a33bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -17,16 +17,16 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import com.google.common.base.Preconditions;
+import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.data.Join.JoinType;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
+import org.eigenbase.rel.JoinRelType;
-import java.util.List;
+import com.google.common.base.Preconditions;
public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
@@ -34,7 +34,7 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
@Override
public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
- if(config.getJoinType() == JoinType.RIGHT){
+ if(config.getJoinType() == JoinRelType.RIGHT){
return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0));
}else{
return new MergeJoinBatch(config, context, children.get(0), children.get(1));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 43d3b457b..72f1ad93b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -25,14 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Direction;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -60,6 +59,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -367,7 +367,7 @@ public class MergingRecordBatch implements RecordBatch {
private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
// set up the expression evaluator and code generation
- final List<OrderDef> orderings = config.getOrderings();
+ final List<Ordering> orderings = config.getOrderings();
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<MergingReceiverGeneratorBase> cg =
CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -550,11 +550,11 @@ public class MergingRecordBatch implements RecordBatch {
// generate less than/greater than checks (fixing results for ASCending vs. DESCending)
cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1)))
._then()
- ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? 1 : -1));
+ ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? 1 : -1));
cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1)))
._then()
- ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? -1 : 1));
+ ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? -1 : 1));
++comparisonVectorIndex;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index da8978f39..381fbe24b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,7 +23,6 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -32,6 +31,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.cache.Counter;
@@ -69,6 +69,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -189,7 +190,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
// Uses the
- // the expressions from the OrderDefs to populate each column. There is one column for each OrderDef in
+ // the expressions from the Orderings to populate each column. There is one column for each Ordering in
// popConfig.orderings.
VectorContainer containerToCache = new VectorContainer();
@@ -293,11 +294,11 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
VectorContainer allSamplesContainer = new VectorContainer();
containerBuilder.build(context, allSamplesContainer);
- List<OrderDef> orderDefs = Lists.newArrayList();
+ List<Ordering> orderDefs = Lists.newArrayList();
int i = 0;
- for (OrderDef od : popConfig.getOrderings()) {
+ for (Ordering od : popConfig.getOrderings()) {
SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
- orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp)));
+ orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
}
// sort the data incoming samples.
@@ -330,8 +331,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
/**
* Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer
- * outgoing. Each OrderDef in orderings generates a column, and evaluation of the expression associated with each
- * OrderDef determines the value of each column. These records will later be sorted based on the values in each
+ * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
+ * Ordering determines the value of each column. These records will later be sorted based on the values in each
* column, in the same order as the orderings.
*
* @param sv4
@@ -342,14 +343,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* @throws SchemaChangeException
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
- List<OrderDef> orderings) throws SchemaChangeException {
+ List<Ordering> orderings) throws SchemaChangeException {
List<ValueVector> localAllocationVectors = Lists.newArrayList();
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
int i = 0;
- for (OrderDef od : orderings) {
+ for (Ordering od : orderings) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
SchemaPath schemaPath = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
@@ -521,7 +522,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
cg.setMappingSet(mainMapping);
int count = 0;
- for (OrderDef od : popConfig.getOrderings()) {
+ for (Ordering od : popConfig.getOrderings()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
if (collector.hasErrors())
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
@@ -538,7 +539,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
ClassGenerator.HoldingContainer out = cg.addExpr(f, false);
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if (od.getDirection() == Order.Direction.ASC) {
+ if (od.getDirection() == Direction.Ascending) {
jc._then()._return(out.getValue());
} else {
jc._then()._return(out.getValue().minus());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 3b8154efd..509d13b41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -88,6 +88,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
return ref;
}
+ private boolean isWildcard(NamedExpression ex){
+ LogicalExpression expr = ex.getExpr();
+ LogicalExpression ref = ex.getRef();
+ if(expr instanceof SchemaPath && ref instanceof SchemaPath){
+ PathSegment e = ((SchemaPath) expr).getRootSegment();
+ PathSegment n = ((SchemaPath) ref).getRootSegment();
+ if(e.isNamed() && e.getNameSegment().getPath().equals("*") && n.isNamed() && n.getChild() != null && n.getChild().isNamed() && n.getChild().getNameSegment().getPath().equals("*")){
+ return true;
+ }
+ }
+ return false;
+ }
@Override
protected void setupNewSchema() throws SchemaChangeException{
@@ -99,33 +111,43 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- for(int i = 0; i < exprs.size(); i++){
- final NamedExpression namedExpression = exprs.get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
- final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
- if(collector.hasErrors()){
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
- }
-
- // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
- ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
- Preconditions.checkNotNull(incoming);
-
- TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+ if(exprs.size() == 1 && isWildcard(exprs.get(0))){
+ for(VectorWrapper<?> wrapper : incoming){
+ ValueVector vvIn = wrapper.getValueVector();
+ TransferPair tp = wrapper.getValueVector().getTransferPair(new FieldReference(vvIn.getField().getName()));
transfers.add(tp);
container.add(tp.getTo());
- logger.debug("Added transfer.");
- }else{
- // need to do evaluation.
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
- allocationVectors.add(vector);
- TypedFieldId fid = container.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
- cg.addExpr(write);
- logger.debug("Added eval.");
}
+ }else{
+ for(int i = 0; i < exprs.size(); i++){
+ final NamedExpression namedExpression = exprs.get(i);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
+ final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
+ if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
+ ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
+ Preconditions.checkNotNull(incoming);
+
+ TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+ transfers.add(tp);
+ container.add(tp.getTo());
+ logger.debug("Added transfer.");
+ }else{
+ // need to do evaluation.
+ ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ allocationVectors.add(vector);
+ TypedFieldId fid = container.add(vector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
+ cg.addExpr(write);
+ logger.debug("Added eval.");
+ }
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 4d0473558..5fd12c000 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -20,13 +20,11 @@ package org.apache.drill.exec.physical.impl.sort;
import java.io.IOException;
import java.util.List;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -46,6 +44,7 @@ import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.collect.ImmutableList;
import com.sun.codemodel.JConditional;
@@ -165,20 +164,20 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return createNewSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
}
- public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
+ public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
}
- public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
+ public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
throws ClassTransformationException, IOException, SchemaChangeException{
CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
ClassGenerator<Sorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(OrderDef od : orderings){
+ for(Ordering od : orderings){
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
@@ -194,7 +193,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
HoldingContainer out = g.addExpr(f, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASC){
+ if(od.getDirection() == Direction.Ascending){
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
new file mode 100644
index 000000000..b4c1bf9ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.BitSet;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Aggregation implemented in Drill.
+ */
+public class DrillAggregateRel extends AggregateRelBase implements DrillRel {
+ /** Creates a DrillAggregateRel. */
+ public DrillAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+ List<AggregateCall> aggCalls) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls);
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct()) {
+ throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
+ }
+ }
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ try {
+ return new DrillAggregateRel(getCluster(), traitSet, sole(inputs), getGroupSet(), aggCalls);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+
+ GroupingAggregate.Builder builder = GroupingAggregate.builder();
+ builder.setInput(implementor.visitChild(this, 0, getChild()));
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ final List<String> fields = getRowType().getFieldNames();
+
+ for (int group : BitSets.toIter(groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ builder.addKey(fr, fr);
+ }
+
+ for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+ FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
+ LogicalExpression expr = toDrill(aggCall.e, childFields, implementor);
+ builder.addExpr(ref, expr);
+ }
+
+ return builder.build();
+ }
+
+
+
+
+ private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for(Integer i : call.getArgList()){
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+ LogicalExpression expr = implementor.getContext().getRegistry().createExpression(call.getAggregation().getName().toLowerCase(), ExpressionPosition.UNKNOWN, args);
+ return expr;
+ }
+
+ public static DrillAggregateRel convert(GroupingAggregate groupBy, ConversionContext value)
+ throws InvalidRelException {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
new file mode 100644
index 000000000..77f1ba6d3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.AggregateRel;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import java.util.logging.Logger;
+
+/**
+ * Rule that converts an {@link AggregateRel} to a {@link DrillAggregateRel}, implemented by a Drill "segment" operation
+ * followed by a "collapseaggregate" operation.
+ */
+public class DrillAggregateRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillAggregateRule();
+ protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+ private DrillAggregateRule() {
+ super(RelOptHelper.some(AggregateRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillAggregateRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final AggregateRel aggregate = (AggregateRel) call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = aggregate.getTraitSet().plus(DrillRel.CONVENTION);
+ final RelNode convertedInput = convert(input, traits);
+ try {
+ call.transformTo(new DrillAggregateRel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
+ aggregate.getAggCallList()));
+ } catch (InvalidRelException e) {
+ tracer.warning(e.toString());
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
new file mode 100644
index 000000000..cc147e315
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.FilterRelBase;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+
+/**
+ * Filter implemented in Drill.
+ */
+public class DrillFilterRel extends FilterRelBase implements DrillRel {
+ protected DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+ super(cluster, traits, child, condition);
+ assert getConvention() == CONVENTION;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillFilterRel(getCluster(), traitSet, sole(inputs), getCondition());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(0.1);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final LogicalOperator input = implementor.visitChild(this, 0, getChild());
+ Filter f = new Filter(DrillOptiq.toDrill(implementor.getContext(), getChild(), getCondition()));
+ f.setInput(input);
+ return f;
+ }
+
+ public static DrillFilterRel convert(Filter filter, ConversionContext context) throws InvalidRelException{
+ RelNode input = context.toRel(filter.getInput());
+ return new DrillFilterRel(context.getCluster(), context.getLogicalTraits(), input, context.toRex(filter.getExpr()));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java
new file mode 100644
index 000000000..d4fb2391a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.FilterRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+
+/**
+ * Rule that converts a {@link org.eigenbase.rel.FilterRel} to a Drill "filter" operation.
+ */
+public class DrillFilterRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillFilterRule();
+
+ private DrillFilterRule() {
+ super(RelOptHelper.some(FilterRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillFilterRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final FilterRel filter = (FilterRel) call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = filter.getTraitSet().plus(DrillRel.CONVENTION);
+ final RelNode convertedInput = convert(input, traits);
+ call.transformTo(new DrillFilterRel(filter.getCluster(), traits, convertedInput, filter.getCondition()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java
new file mode 100644
index 000000000..acd218c25
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Set;
+
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.LogicalPlanBuilder;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.common.logical.PlanProperties.PlanType;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+import org.eigenbase.rel.RelNode;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Context for converting a tree of {@link DrillRel} nodes into a Drill logical plan.
+ */
+public class DrillImplementor {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillImplementor.class);
+
+ private Set<DrillTable> tables = Sets.newHashSet();
+ private LogicalPlanBuilder planBuilder = new LogicalPlanBuilder();
+ private LogicalPlan plan;
+ private final DrillParseContext context;
+
+
+ public DrillImplementor(DrillParseContext context, ResultMode mode) {
+ planBuilder.planProperties(PlanType.APACHE_DRILL_LOGICAL, 1, DrillImplementor.class.getName(), "", mode);
+ this.context = context;
+ }
+
+ public DrillParseContext getContext(){
+ return context;
+ }
+
+ public void registerSource(DrillTable table){
+ if(tables.add(table)){
+ planBuilder.addStorageEngine(table.getStorageEngineName(), table.getStorageEngineConfig());
+ }
+ }
+
+ public void go(DrillRel root) {
+ LogicalOperator rootLOP = root.implement(this);
+ rootLOP.accept(new AddOpsVisitor(), null);
+ }
+
+ public LogicalPlan getPlan(){
+ if(plan == null){
+ plan = planBuilder.build();
+ planBuilder = null;
+ }
+ return plan;
+ }
+
+ public LogicalOperator visitChild(DrillRel parent, int ordinal, RelNode child) {
+ return ((DrillRel) child).implement(this);
+ }
+
+ private class AddOpsVisitor extends AbstractLogicalVisitor<Void, Void, RuntimeException> {
+ @Override
+ public Void visitOp(LogicalOperator op, Void value) throws RuntimeException {
+ planBuilder.addLogicalOperator(op);
+ for(LogicalOperator o : op){
+ o.accept(this, null);
+ }
+ return null;
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
new file mode 100644
index 000000000..c08712ef6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexUtil;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+import org.eigenbase.util.Pair;
+
+/**
+ * Join implemented in Drill.
+ */
+public class DrillJoinRel extends JoinRelBase implements DrillRel {
+ private final List<Integer> leftKeys = new ArrayList<>();
+ private final List<Integer> rightKeys = new ArrayList<>();
+
+ /** Creates a DrillJoinRel. */
+ public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType, Collections.<String> emptySet());
+
+ RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+ if (!remaining.isAlwaysTrue()) {
+ throw new InvalidRelException("DrillJoinRel only supports equi-join");
+ }
+ }
+
+ @Override
+ public DrillJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType) {
+ try {
+ return new DrillJoinRel(getCluster(), traitSet, left, right, condition, joinType);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final List<String> fields = getRowType().getFieldNames();
+ assert isUnique(fields);
+ final int leftCount = left.getRowType().getFieldCount();
+ final List<String> leftFields = fields.subList(0, leftCount);
+ final List<String> rightFields = fields.subList(leftCount, fields.size());
+
+ final LogicalOperator leftOp = implementInput(implementor, 0, 0, left);
+ final LogicalOperator rightOp = implementInput(implementor, 1, leftCount, right);
+
+ Join.Builder builder = Join.builder();
+ builder.type(joinType);
+ builder.left(leftOp);
+ builder.right(rightOp);
+
+ for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+ builder.addCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them.
+ * @param implementor
+ * @param i
+ * @param offset
+ * @param input
+ * @return
+ */
+ private LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, RelNode input) {
+ final LogicalOperator inputOp = implementor.visitChild(this, i, input);
+ assert uniqueFieldNames(input.getRowType());
+ final List<String> fields = getRowType().getFieldNames();
+ final List<String> inputFields = input.getRowType().getFieldNames();
+ final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+ if (!outputFields.equals(inputFields)) {
+ // Ensure that input field names are the same as output field names.
+ // If there are duplicate field names on left and right, fields will get
+ // lost.
+ return rename(implementor, inputOp, inputFields, outputFields);
+ } else {
+ return inputOp;
+ }
+ }
+
+ private LogicalOperator rename(DrillImplementor implementor, LogicalOperator inputOp, List<String> inputFields, List<String> outputFields) {
+ Project.Builder builder = Project.builder();
+ builder.setInput(inputOp);
+ for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
+ builder.addExpr(new FieldReference(pair.right), new FieldReference(pair.left));
+ }
+ return builder.build();
+ }
+
+ public static DrillJoinRel convert(Join join, ConversionContext context) throws InvalidRelException{
+ RelNode left = context.toRel(join.getLeft());
+ RelNode right = context.toRel(join.getRight());
+
+ List<RexNode> joinConditions = new ArrayList<RexNode>();
+ // right fields appear after the LHS fields.
+ final int rightInputOffset = left.getRowType().getFieldCount();
+ for (JoinCondition condition : join.getConditions()) {
+ RelDataTypeField leftField = left.getRowType().getField(ExprHelper.getFieldName(condition.getLeft()), true);
+ RelDataTypeField rightField = right.getRowType().getField(ExprHelper.getFieldName(condition.getRight()), true);
+ joinConditions.add(
+ context.getRexBuilder().makeCall(
+ SqlStdOperatorTable.EQUALS,
+ context.getRexBuilder().makeInputRef(leftField.getType(), leftField.getIndex()),
+ context.getRexBuilder().makeInputRef(rightField.getType(), rightInputOffset + rightField.getIndex())
+ )
+ );
+ }
+ RexNode rexCondition = RexUtil.composeConjunction(context.getRexBuilder(), joinConditions, false);
+ return new DrillJoinRel(context.getCluster(), context.getLogicalTraits(), left, right, rexCondition, join.getJoinType());
+ }
+
+
+ /**
+ * Returns whether there are any elements in common between left and right.
+ */
+ private static <T> boolean intersects(List<T> left, List<T> right) {
+ return new HashSet<>(left).removeAll(right);
+ }
+
+ private boolean uniqueFieldNames(RelDataType rowType) {
+ return isUnique(rowType.getFieldNames());
+ }
+
+ private static <T> boolean isUnique(List<T> list) {
+ return new HashSet<>(list).size() == list.size();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
new file mode 100644
index 000000000..f32fa590f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import java.util.logging.Logger;
+
+/**
+ * Rule that converts a {@link JoinRel} to a {@link DrillJoinRel}, which is implemented by Drill "join" operation.
+ */
+public class DrillJoinRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillJoinRule();
+ protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+ private DrillJoinRule() {
+ super(
+ RelOptHelper.some(JoinRel.class, Convention.NONE, RelOptHelper.any(RelNode.class), RelOptHelper.any(RelNode.class)),
+ "DrillJoinRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final JoinRel join = (JoinRel) call.rel(0);
+ final RelNode left = call.rel(1);
+ final RelNode right = call.rel(2);
+ final RelTraitSet traits = join.getTraitSet().plus(DrillRel.CONVENTION);
+
+ final RelNode convertedLeft = convert(left, traits);
+ final RelNode convertedRight = convert(right, traits);
+ try {
+ call.transformTo(new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, join.getCondition(),
+ join.getJoinType()));
+ } catch (InvalidRelException e) {
+ tracer.warning(e.toString());
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
new file mode 100644
index 000000000..54be0527e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.type.SqlTypeName;
+
+public class DrillLimitRel extends SingleRel implements DrillRel {
+ private RexNode offset;
+ private RexNode fetch;
+
+ public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child);
+ this.offset = offset;
+ this.fetch = fetch;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator inputOp = implementor.visitChild(this, 0, getChild());
+
+ // First offset to include into results (inclusive). Null implies it is starting from offset 0
+ int first = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0;
+
+ // Last offset to stop including into results (exclusive), translating fetch row counts into an offset.
+ // Null value implies including entire remaining result set from first offset
+ Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
+ Limit limit = new Limit(first, last);
+ limit.setInput(inputOp);
+ return limit;
+ }
+
+ public static DrillLimitRel convert(Limit limit, ConversionContext context) throws InvalidRelException{
+ RelNode input = context.toRel(limit.getInput());
+ RexNode first = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getFirst()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ RexNode last = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getLast()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ return new DrillLimitRel(context.getCluster(), context.getLogicalTraits(), input, first, last);
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
new file mode 100644
index 000000000..85c594e45
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and Limit Rel
+ */
+public class DrillLimitRule extends RelOptRule {
+ public static DrillLimitRule INSTANCE = new DrillLimitRule();
+
+ private DrillLimitRule() {
+ super(RelOptHelper.some(SortRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillLimitRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final SortRel sort = call.rel(0);
+ return sort.offset != null || sort.fetch != null;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final SortRel incomingSort = call.rel(0);
+ final RelTraitSet incomingTraits = incomingSort.getTraitSet();
+ RelNode input = incomingSort.getChild();
+
+ // if the Optiq sort rel includes a collation and a limit, we need to create a copy the sort rel that excludes the
+ // limit information.
+ if (!incomingSort.getCollation().getFieldCollations().isEmpty()) {
+ input = incomingSort.copy(incomingTraits, input, incomingSort.getCollation(), null, null);
+ }
+
+ RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.CONVENTION));
+ call.transformTo(new DrillLimitRel(incomingSort.getCluster(), incomingTraits.plus(DrillRel.CONVENTION), convertedInput, incomingSort.offset, incomingSort.fetch));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
new file mode 100644
index 000000000..2e29bd425
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.exec.record.NullExpression;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexCorrelVariable;
+import org.eigenbase.rex.RexDynamicParam;
+import org.eigenbase.rex.RexFieldAccess;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexLocalRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.eigenbase.rex.RexRangeRef;
+import org.eigenbase.rex.RexVisitorImpl;
+import org.eigenbase.sql.SqlSyntax;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Utilities for Drill's planner.
+ */
+public class DrillOptiq {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOptiq.class);
+
+ /**
+ * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax.
+ */
+ static LogicalExpression toDrill(DrillParseContext context, RelNode input, RexNode expr) {
+ final RexToDrill visitor = new RexToDrill(context, input);
+ return expr.accept(visitor);
+ }
+
+ private static class RexToDrill extends RexVisitorImpl<LogicalExpression> {
+ private final RelNode input;
+ private final DrillParseContext context;
+
+ RexToDrill(DrillParseContext context, RelNode input) {
+ super(true);
+ this.context = context;
+ this.input = input;
+ }
+
+ @Override
+ public LogicalExpression visitInputRef(RexInputRef inputRef) {
+ final int index = inputRef.getIndex();
+ final RelDataTypeField field = input.getRowType().getFieldList().get(index);
+ return new FieldReference(field.getName());
+ }
+
+ @Override
+ public LogicalExpression visitCall(RexCall call) {
+ logger.debug("RexCall {}, {}", call);
+ final SqlSyntax syntax = call.getOperator().getSyntax();
+ switch (syntax) {
+ case BINARY:
+ logger.debug("Binary");
+ LogicalExpression op1 = call.getOperands().get(0).accept(this);
+ LogicalExpression op2 = call.getOperands().get(1).accept(this);
+ return context.getRegistry().createExpression(call.getOperator().getName(), Lists.newArrayList(op1, op2));
+ case FUNCTION:
+ logger.debug("Function");
+ List<LogicalExpression> exprs = Lists.newArrayList();
+ for(RexNode n : call.getOperands()){
+ exprs.add(n.accept(this));
+ }
+ return context.getRegistry().createExpression(call.getOperator().getName().toLowerCase(), Lists.newArrayList(exprs));
+ case SPECIAL:
+ logger.debug("Special");
+ switch(call.getKind()){
+
+ case CAST:
+ return getDrillCastFunctionFromOptiq(call);
+ }
+
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ SchemaPath left = (SchemaPath) call.getOperands().get(0).accept(this);
+ final RexLiteral literal = (RexLiteral) call.getOperands().get(1);
+ return left.getChild((String) literal.getValue2());
+ }
+
+ // fall through
+ default:
+ throw new AssertionError("todo: implement syntax " + syntax + "(" + call + ")");
+ }
+ }
+
+ private LogicalExpression doUnknown(Object o){
+ logger.warn("Doesn't currently support consumption of {}.", o);
+ return NullExpression.INSTANCE;
+ }
+ @Override
+ public LogicalExpression visitLocalRef(RexLocalRef localRef) {
+ return doUnknown(localRef);
+ }
+
+ @Override
+ public LogicalExpression visitOver(RexOver over) {
+ return doUnknown(over);
+ }
+
+ @Override
+ public LogicalExpression visitCorrelVariable(RexCorrelVariable correlVariable) {
+ return doUnknown(correlVariable);
+ }
+
+ @Override
+ public LogicalExpression visitDynamicParam(RexDynamicParam dynamicParam) {
+ return doUnknown(dynamicParam);
+ }
+
+ @Override
+ public LogicalExpression visitRangeRef(RexRangeRef rangeRef) {
+ return doUnknown(rangeRef);
+ }
+
+ @Override
+ public LogicalExpression visitFieldAccess(RexFieldAccess fieldAccess) {
+ return super.visitFieldAccess(fieldAccess);
+ }
+
+
+ private LogicalExpression getDrillCastFunctionFromOptiq(RexCall call){
+ LogicalExpression arg = call.getOperands().get(0).accept(this);
+ List<LogicalExpression> args = Collections.singletonList(arg);
+ String fname = null;
+ switch(call.getType().getSqlTypeName().getName()){
+ case "VARCHAR": {
+ args = Lists.newArrayList(arg, new LongExpression(call.getType().getPrecision()));
+ return context.getRegistry().createExpression("castVARCHAR", args);
+ }
+ case "INTEGER": fname = "castINT"; break;
+ case "FLOAT": fname = "castFLOAT4"; break;
+ case "DOUBLE": fname = "castFLOAT8"; break;
+ case "DECIMAL": throw new UnsupportedOperationException("Need to add decimal.");
+ default: fname = "cast" + call.getType().getSqlTypeName().getName();
+ }
+ return context.getRegistry().createExpression(fname, args);
+
+ }
+
+
+
+ @Override
+ public LogicalExpression visitLiteral(RexLiteral literal) {
+ switch(literal.getTypeName()){
+ case BIGINT:
+ long l = ((BigDecimal) literal.getValue()).longValue();
+ return ValueExpressions.getBigInt(l);
+ case BOOLEAN:
+ return ValueExpressions.getBit(((Boolean) literal.getValue()));
+ case CHAR:
+ return ValueExpressions.getChar(((String) literal.getValue()));
+ case DOUBLE:
+ double d = ((BigDecimal) literal.getValue()).doubleValue();
+ return ValueExpressions.getFloat8(d);
+ case FLOAT:
+ float f = ((BigDecimal) literal.getValue()).floatValue();
+ return ValueExpressions.getFloat4(f);
+ case INTEGER:
+ case DECIMAL:
+ int i = ((BigDecimal) literal.getValue()).intValue();
+ return ValueExpressions.getInt(i);
+ case VARCHAR:
+ return ValueExpressions.getChar(((String) literal.getValue()));
+ default:
+ throw new UnsupportedOperationException(String.format("Unable to convert the value of %s and type %s to a Drill constant expression.", literal, literal.getTypeName()));
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java
new file mode 100644
index 000000000..b82fef56a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.common.expression.FunctionRegistry;
+
+public class DrillParseContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParseContext.class);
+
+ private final FunctionRegistry registry;
+
+ public DrillParseContext(FunctionRegistry registry) {
+ super();
+ this.registry = registry;
+ }
+
+ public FunctionRegistry getRegistry(){
+ return registry;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
new file mode 100644
index 000000000..ee3a0f45a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.reltype.RelDataTypeFieldImpl;
+import org.eigenbase.reltype.RelRecordType;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.type.SqlTypeName;
+import org.eigenbase.util.Pair;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+/**
+ * Project implemented in Drill.
+ */
+public class DrillProjectRel extends ProjectRelBase implements DrillRel {
+ protected DrillProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
+ RelDataType rowType) {
+ super(cluster, traits, child, exps, rowType, Flags.BOXED);
+ assert getConvention() == CONVENTION;
+ }
+
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillProjectRel(getCluster(), traitSet, sole(inputs), new ArrayList<RexNode>(exps), rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(0.1);
+ }
+
+ private List<Pair<RexNode, String>> projects() {
+ return Pair.zip(exps, getRowType().getFieldNames());
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator inputOp = implementor.visitChild(this, 0, getChild());
+ Project.Builder builder = Project.builder();
+ builder.setInput(inputOp);
+ for (Pair<RexNode, String> pair : projects()) {
+ LogicalExpression expr = DrillOptiq.toDrill(implementor.getContext(), getChild(), pair.left);
+ builder.addExpr(new FieldReference("output." + pair.right), expr);
+ }
+ return builder.build();
+ }
+
+ public static DrillProjectRel convert(Project project, ConversionContext context) throws InvalidRelException{
+ RelNode input = context.toRel(project.getInput());
+ List<RelDataTypeField> fields = Lists.newArrayList();
+ List<RexNode> exps = Lists.newArrayList();
+ for(NamedExpression expr : project.getSelections()){
+ fields.add(new RelDataTypeFieldImpl(expr.getRef().getPath().toString(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) ));
+ exps.add(context.toRex(expr.getExpr()));
+ }
+ return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java
new file mode 100644
index 000000000..bf5bcffc7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * Rule that converts a {@link org.eigenbase.rel.ProjectRel} to a Drill "project" operation.
+ */
+public class DrillProjectRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillProjectRule();
+
+ private DrillProjectRule() {
+ super(RelOptHelper.some(ProjectRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillProjectRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ProjectRel project = (ProjectRel) call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = project.getTraitSet().plus(DrillRel.CONVENTION);
+ final RelNode convertedInput = convert(input, traits);
+ call.transformTo(new DrillProjectRel(project.getCluster(), traits, convertedInput, project.getProjects(), project
+ .getRowType()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
new file mode 100644
index 000000000..63a7207c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.Convention;
+
+/**
+ * Relational expression that is implemented in Drill.
+ */
+public interface DrillRel extends RelNode {
+ /** Calling convention for relational expressions that are "implemented" by
+ * generating Drill logical plans. */
+ Convention CONVENTION = new Convention.Impl("DRILL", DrillRel.class);
+
+ LogicalOperator implement(DrillImplementor implementor);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
new file mode 100644
index 000000000..53da67f6c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Iterator;
+
+import net.hydromatic.optiq.tools.RuleSet;
+
+import org.eigenbase.rel.rules.MergeProjectRule;
+import org.eigenbase.rel.rules.PushFilterPastJoinRule;
+import org.eigenbase.rel.rules.PushFilterPastProjectRule;
+import org.eigenbase.rel.rules.PushJoinThroughJoinRule;
+import org.eigenbase.rel.rules.PushSortPastProjectRule;
+import org.eigenbase.rel.rules.ReduceAggregatesRule;
+import org.eigenbase.rel.rules.RemoveDistinctAggregateRule;
+import org.eigenbase.rel.rules.RemoveDistinctRule;
+import org.eigenbase.rel.rules.RemoveSortRule;
+import org.eigenbase.rel.rules.RemoveTrivialCalcRule;
+import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
+import org.eigenbase.rel.rules.SwapJoinRule;
+import org.eigenbase.rel.rules.TableAccessRule;
+import org.eigenbase.rel.rules.UnionToDistinctRule;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.volcano.AbstractConverter.ExpandConversionRule;
+
+import com.google.common.collect.ImmutableSet;
+
+public class DrillRuleSets {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class);
+
+ public static final RuleSet DRILL_BASIC_RULES = new DrillRuleSet(ImmutableSet.of( //
+
+// ExpandConversionRule.instance,
+// SwapJoinRule.instance,
+// RemoveDistinctRule.instance,
+// UnionToDistinctRule.instance,
+// RemoveTrivialProjectRule.instance,
+// RemoveTrivialCalcRule.instance,
+// RemoveSortRule.INSTANCE,
+//
+// TableAccessRule.instance, //
+// MergeProjectRule.instance, //
+// PushFilterPastProjectRule.instance, //
+// PushFilterPastJoinRule.FILTER_ON_JOIN, //
+// RemoveDistinctAggregateRule.instance, //
+// ReduceAggregatesRule.instance, //
+// SwapJoinRule.instance, //
+// PushJoinThroughJoinRule.RIGHT, //
+// PushJoinThroughJoinRule.LEFT, //
+// PushSortPastProjectRule.INSTANCE, //
+
+ DrillScanRule.INSTANCE,
+ DrillFilterRule.INSTANCE,
+ DrillProjectRule.INSTANCE,
+ DrillAggregateRule.INSTANCE,
+
+ DrillLimitRule.INSTANCE,
+ DrillSortRule.INSTANCE,
+ DrillJoinRule.INSTANCE,
+ DrillUnionRule.INSTANCE
+ ));
+
+
+ private static class DrillRuleSet implements RuleSet{
+ final ImmutableSet<RelOptRule> rules;
+
+ public DrillRuleSet(ImmutableSet<RelOptRule> rules) {
+ super();
+ this.rules = rules;
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
new file mode 100644
index 000000000..afc2d1bd4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.TableAccessRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * GroupScan of a Drill table.
+ */
+public class DrillScanRel extends TableAccessRelBase implements DrillRel {
+ private final DrillTable drillTable;
+
+ /** Creates a DrillScan. */
+ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table) {
+ super(cluster, traits, table);
+ assert getConvention() == CONVENTION;
+ this.drillTable = table.unwrap(DrillTable.class);
+ assert drillTable != null;
+ }
+
+// @Override
+// public void register(RelOptPlanner planner) {
+// super.register(planner);
+// DrillOptiq.registerStandardPlannerRules(planner);
+// }
+
+ public LogicalOperator implement(DrillImplementor implementor) {
+ Scan.Builder builder = Scan.builder();
+ builder.storageEngine(drillTable.getStorageEngineName());
+ builder.selection(new JSONOptions(drillTable.getSelection()));
+ //builder.outputReference(new FieldReference("_MAP"));
+ implementor.registerSource(drillTable);
+ return builder.build();
+ }
+
+ public static DrillScanRel convert(Scan scan, ConversionContext context){
+ return new DrillScanRel(context.getCluster(), context.getLogicalTraits(), context.getTable(scan));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java
new file mode 100644
index 000000000..58e648a73
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel;
+
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class DrillScanRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillScanRule();
+
+ private DrillScanRule() {
+ super(RelOptHelper.any(EnumerableTableAccessRel.class), "DrillTableRule");
+ }
+
+
+
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final EnumerableTableAccessRel access = (EnumerableTableAccessRel) call.rel(0);
+ final RelTraitSet traits = access.getTraitSet().plus(DrillRel.CONVENTION);
+ call.transformTo(new DrillScanRel(access.getCluster(), traits, access.getTable()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java
new file mode 100644
index 000000000..829947adb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import net.hydromatic.optiq.impl.java.JavaTypeFactory;
+import net.hydromatic.optiq.rules.java.EnumerableConvention;
+import net.hydromatic.optiq.rules.java.EnumerableRel;
+import net.hydromatic.optiq.rules.java.EnumerableRelImplementor;
+import net.hydromatic.optiq.rules.java.JavaRowFormat;
+import net.hydromatic.optiq.rules.java.PhysType;
+import net.hydromatic.optiq.rules.java.PhysTypeImpl;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Store;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Relational expression that converts from Drill to Enumerable. At runtime it executes a Drill query and returns the
+ * results as an {@link net.hydromatic.linq4j.Enumerable}.
+ */
+public class DrillScreenRel extends SingleRel implements DrillRel {
+ private static final Logger logger = LoggerFactory.getLogger(DrillScreenRel.class);
+
+ private PhysType physType;
+
+ public DrillScreenRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
+ super(cluster, traitSet, input);
+ assert input.getConvention() == DrillRel.CONVENTION;
+ physType = PhysTypeImpl.of((JavaTypeFactory) cluster.getTypeFactory(), input.getRowType(), JavaRowFormat.ARRAY);
+ }
+
+ public PhysType getPhysType() {
+ return physType;
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillScreenRel(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator childOp = implementor.visitChild(this, 0, getChild());
+ return Store.builder().setInput(childOp).storageEngine("--SCREEN--").build();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
new file mode 100644
index 000000000..04af9d5b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelCollationImpl;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Sort implemented in Drill.
+ */
+public class DrillSortRel extends SortRel implements DrillRel {
+
+ /** Creates a DrillSortRel. */
+ public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
+ super(cluster, traits, input, collation);
+ }
+
+ /** Creates a DrillSortRel with offset and fetch. */
+ public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, input, collation, offset, fetch);
+ }
+
+ @Override
+ public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ return new DrillSortRel(getCluster(), traitSet, input, collation, offset, fetch);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final Order.Builder builder = Order.builder();
+ builder.setInput(implementor.visitChild(this, 0, getChild()));
+
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ for(RelFieldCollation fieldCollation : this.collation.getFieldCollations()){
+ builder.addOrdering(fieldCollation.getDirection(),
+ new FieldReference(childFields.get(fieldCollation.getFieldIndex())),
+ fieldCollation.nullDirection);
+ }
+ return builder.build();
+ }
+
+
+ public static RelNode convert(Order order, ConversionContext context) throws InvalidRelException{
+
+ // if there are compound expressions in the order by, we need to convert into projects on either side.
+ RelNode input = context.toRel(order.getInput());
+ List<String> fields = input.getRowType().getFieldNames();
+
+ // build a map of field names to indices.
+ Map<String, Integer> fieldMap = Maps.newHashMap();
+ int i =0;
+ for(String field : fields){
+ fieldMap.put(field, i);
+ i++;
+ }
+
+ List<RelFieldCollation> collations = Lists.newArrayList();
+
+ for(Ordering o : order.getOrderings()){
+ String fieldName = ExprHelper.getFieldName(o.getExpr());
+ int fieldId = fieldMap.get(fieldName);
+ RelFieldCollation c = new RelFieldCollation(fieldId, o.getDirection(), o.getNullDirection());
+ }
+ return new DrillSortRel(context.getCluster(), context.getLogicalTraits(), input, RelCollationImpl.of(collations));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java
new file mode 100644
index 000000000..c968e8545
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+
+/**
+ * Rule that converts an {@link SortRel} to a {@link DrillSortRel}, implemented by a Drill "order" operation.
+ */
+public class DrillSortRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillSortRule();
+
+ private DrillSortRule() {
+ super(RelOptHelper.some(SortRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillSortRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final SortRel sort = call.rel(0);
+ return sort.offset == null && sort.fetch == null;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+
+ final SortRel sort = call.rel(0);
+
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION);
+
+ final RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.CONVENTION));
+ call.transformTo(new DrillSortRel(sort.getCluster(), traits, convertedInput, sort.getCollation()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java
new file mode 100644
index 000000000..30c78109e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import net.hydromatic.optiq.prepare.Prepare.CatalogReader;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.TableModificationRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class DrillStoreRel extends TableModificationRelBase implements DrillRel{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillStoreRel.class);
+
+ protected DrillStoreRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, CatalogReader catalogReader,
+ RelNode child, Operation operation, List<String> updateColumnList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened);
+
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ return null;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
new file mode 100644
index 000000000..30dd48de2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collections;
+
+import net.hydromatic.optiq.Schema.TableType;
+import net.hydromatic.optiq.Statistic;
+import net.hydromatic.optiq.Statistics;
+import net.hydromatic.optiq.Table;
+
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+/** Optiq Table used by Drill. */
+public class DrillTable implements Table{
+
+ private final String name;
+ private final String storageEngineName;
+ public final StorageEngineConfig storageEngineConfig;
+ private Object selection;
+
+
+ /** Creates a DrillTable. */
+ public DrillTable(String name, String storageEngineName, Object selection, StorageEngineConfig storageEngineConfig) {
+ this.name = name;
+ this.selection = selection;
+ this.storageEngineConfig = storageEngineConfig;
+ this.storageEngineName = storageEngineName;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public StorageEngineConfig getStorageEngineConfig(){
+ return storageEngineConfig;
+ }
+
+ public Object getSelection() {
+ return selection;
+ }
+
+ public String getStorageEngineName() {
+ return storageEngineName;
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
+ return new DrillScanRel(context.getCluster(),
+ context.getCluster().traitSetOf(DrillRel.CONVENTION),
+ table);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return new RelDataTypeDrillImpl(typeFactory);
+ }
+
+ @Override
+ public TableType getJdbcTableType() {
+ return null;
+ }
+
+
+
+// /** Factory for custom tables in Optiq schema. */
+// @SuppressWarnings("UnusedDeclaration")
+// public static class Factory implements TableFactory<DrillTable> {
+//
+// @Override
+// public DrillTable create(Schema schema, String name, Map<String, Object> operand, RelDataType rowType) {
+//
+// final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig();
+// final ClasspathInputConfig inputConfig = new ClasspathInputConfig();
+// inputConfig.path = "/" + name.toLowerCase() + ".json";
+// inputConfig.type = DataWriter.ConverterType.JSON;
+// return createTable(schema.getTypeFactory(), (MutableSchema) schema, name, "donuts-json", rseConfig, inputConfig);
+// }
+// }
+
+
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
new file mode 100644
index 000000000..1be9cafea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.UnionRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * Union implemented in Drill.
+ */
+public class DrillUnionRel extends UnionRelBase implements DrillRel {
+ /** Creates a DrillUnionRel. */
+ public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
+ List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ }
+
+ @Override
+ public DrillUnionRel copy(RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
+ return new DrillUnionRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ // divide cost by two to ensure cheaper than EnumerableDrillRel
+ return super.computeSelfCost(planner).multiplyBy(.5);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ Union.Builder builder = Union.builder();
+ for (Ord<RelNode> input : Ord.zip(inputs)) {
+ builder.addInput(implementor.visitChild(this, input.i, input.e));
+ }
+ builder.setDistinct(!all);
+ return builder.build();
+ }
+
+ public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
new file mode 100644
index 000000000..bc1e6f471
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.UnionRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rule that converts a {@link UnionRel} to a {@link DrillUnionRel}, implemented by a "union" operation.
+ */
+public class DrillUnionRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillUnionRule();
+
+ private DrillUnionRule() {
+ super(RelOptHelper.any(UnionRel.class, Convention.NONE), "DrillUnionRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final UnionRel union = (UnionRel) call.rel(0);
+ final RelTraitSet traits = union.getTraitSet().plus(DrillRel.CONVENTION);
+ final List<RelNode> convertedInputs = new ArrayList<>();
+ for (RelNode input : union.getInputs()) {
+ final RelNode convertedInput = convert(input, traits);
+ convertedInputs.add(convertedInput);
+ }
+ call.transformTo(new DrillUnionRel(union.getCluster(), traits, convertedInputs, union.all));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
new file mode 100644
index 000000000..e77018184
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.ValuesRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+/**
+ * Values implemented in Drill.
+ */
+public class DrillValuesRel extends ValuesRelBase implements DrillRel {
+ protected DrillValuesRel(RelOptCluster cluster, RelDataType rowType, List<List<RexLiteral>> tuples, RelTraitSet traits) {
+ super(cluster, rowType, tuples, traits);
+ assert getConvention() == CONVENTION;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return new DrillValuesRel(getCluster(), rowType, tuples, traitSet);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(0.1);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ // Update when https://issues.apache.org/jira/browse/DRILL-57 fixed
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java
new file mode 100644
index 000000000..fae18caff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.ValuesRel;
+import org.eigenbase.relopt.*;
+
+/**
+ * Rule that converts a {@link ValuesRel} to a Drill "values" operation.
+ */
+public class DrillValuesRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillValuesRule();
+
+ private DrillValuesRule() {
+ super(RelOptHelper.any(ValuesRel.class, Convention.NONE), "DrillValuesRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ValuesRel values = (ValuesRel) call.rel(0);
+ final RelTraitSet traits = values.getTraitSet().plus(DrillRel.CONVENTION);
+ call.transformTo(new DrillValuesRel(values.getCluster(), values.getRowType(), values.getTuples(), traits));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java
new file mode 100644
index 000000000..2fcf6600c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import net.hydromatic.optiq.rules.java.EnumerableConvention;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.convert.ConverterRule;
+
+/**
+ * Rule that converts any Drill relational expression to enumerable format by adding a {@link DrillScreenRel}.
+ */
+public class EnumerableDrillRule extends ConverterRule {
+
+ public static EnumerableDrillRule INSTANCE = new EnumerableDrillRule(EnumerableConvention.INSTANCE);
+
+
+ private EnumerableDrillRule(EnumerableConvention outConvention) {
+ super(RelNode.class, DrillRel.CONVENTION, outConvention, "EnumerableDrillRule." + outConvention);
+ }
+
+ @Override
+ public boolean isGuaranteed() {
+ return true;
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ assert rel.getTraitSet().contains(DrillRel.CONVENTION);
+ return new DrillScreenRel(rel.getCluster(), rel.getTraitSet().replace(getOutConvention()), rel);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
new file mode 100644
index 000000000..9456a8128
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+public class ExprHelper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExprHelper.class);
+
+ private final static String COMPOUND_FAIL_MESSAGE = "The current Optiq based logical plan interpreter does not complicated expressions. For Order By and Filter";
+
+ public static String getAggregateFieldName(FunctionCall c){
+ List<LogicalExpression> exprs = c.args;
+ if(exprs.size() != 1) throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
+ return getFieldName(exprs.iterator().next());
+ }
+
+ public static String getFieldName(LogicalExpression e){
+ if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString();
+ throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java
new file mode 100644
index 000000000..835e4a558
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+
+
+import org.eigenbase.reltype.*;
+import org.eigenbase.sql.SqlCollation;
+import org.eigenbase.sql.SqlIdentifier;
+import org.eigenbase.sql.SqlIntervalQualifier;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/* We use an instance of this class as the row type for
+ * Drill table. Since we don't know the schema before hand
+ * whenever optiq requires us to validate that a field exists
+ * we always return true and indicate that the type of that
+ * field is 'ANY'
+ */
+public class RelDataTypeDrillImpl extends RelDataTypeImpl {
+
+ private RelDataTypeField defaultField;
+ RelDataTypeFactory typeFactory;
+ List<RelDataTypeField> drillFieldList = new LinkedList<>();
+ List<String> drillfieldNames = new LinkedList<>();
+
+
+ public RelDataTypeDrillImpl(RelDataTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ computeDigest();
+ }
+
+ @Override
+ public List<RelDataTypeField> getFieldList() {
+
+ if (drillFieldList.size() == 0)
+ {
+ /* By default we only have a single row in drill of 'ANY' type
+ * (mainly for select * type queries)
+ */
+ defaultField = new RelDataTypeFieldImpl("*", 0, typeFactory.createSqlType(SqlTypeName.ANY));
+
+ drillFieldList.add(defaultField);
+ drillfieldNames.add("*");
+ }
+ return drillFieldList;
+ }
+
+
+ @Override
+ public int getFieldCount() {
+ return drillFieldList.size();
+ }
+
+// @Override
+// public int getFieldOrdinal(String fieldName, boolean caseSensitive) {
+//
+// /* Get the list of fields and return the
+// * index if the field exists
+// */
+// for (RelDataTypeField field : drillFieldList) {
+// if (field.getName().equals(fieldName))
+// return field.getIndex();
+// }
+//
+// /* Couldn't find the field in our list, return -1
+// * Unsure if I should add it to our list of fields
+// */
+// return -1;
+// }
+
+ @Override
+ /**
+ *
+ */
+ public RelDataTypeField getField(String fieldName, boolean caseSensitive) {
+
+ /* First check if this field name exists in our field list */
+ for (RelDataTypeField field : drillFieldList)
+ {
+ if (field.getName().equals(fieldName))
+ return field;
+ }
+
+ /* This field does not exist in our field list add it */
+ RelDataTypeField newField = new RelDataTypeFieldImpl(fieldName, drillFieldList.size(), typeFactory.createSqlType(SqlTypeName.ANY));
+
+ /* Add it to the list of fields */
+ drillFieldList.add(newField);
+
+ /* Add the name to our list of field names */
+ drillfieldNames.add(fieldName);
+
+ return newField;
+ }
+
+
+ @Override
+ public List<String> getFieldNames() {
+
+ if (drillfieldNames.size() == 0) {
+ drillfieldNames.add("*");
+ }
+
+ return drillfieldNames;
+ }
+
+ @Override
+ public SqlTypeName getSqlTypeName() {
+ return null;
+ }
+
+ @Override
+ protected void generateTypeString(StringBuilder sb, boolean withDetail) {
+ sb.append("DrillRecordRow");
+ }
+
+ @Override
+ public boolean isStruct() {
+ return true;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
new file mode 100644
index 000000000..92272f879
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleOperand;
+import org.eigenbase.relopt.RelTrait;
+
+public class RelOptHelper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RelOptHelper.class);
+
+ public static RelOptRuleOperand any(Class<? extends RelNode> first, RelTrait trait){
+ return RelOptRule.operand(first, trait, RelOptRule.any());
+ }
+
+ public static RelOptRuleOperand any(Class<? extends RelNode> first){
+ return RelOptRule.operand(first, RelOptRule.any());
+ }
+
+ public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){
+ return RelOptRule.operand(rel, RelOptRule.some(first, rest));
+ }
+
+ public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelTrait trait, RelOptRuleOperand first, RelOptRuleOperand... rest){
+ return RelOptRule.operand(rel, trait, RelOptRule.some(first, rest));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
new file mode 100644
index 000000000..980977085
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Constant;
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.SinkOperator;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This visitor will walk a logical plan and record in a map the list of field references associated to each scan. These
+ * can then be used to update scan object to appear to be explicitly fielded for optimization purposes.
+ */
+public class ScanFieldDeterminer extends AbstractLogicalVisitor<Void, ScanFieldDeterminer.FieldList, RuntimeException> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanFieldDeterminer.class);
+
+ private FieldReferenceFinder finder = new FieldReferenceFinder();
+ private Map<Scan, FieldList> scanFields = Maps.newHashMap();
+
+
+ public static Map<Scan, FieldList> getFieldLists(LogicalPlan plan){
+ Collection<SinkOperator> ops = plan.getGraph().getRoots();
+ Preconditions.checkArgument(ops.size() == 1, "Scan Field determiner currently only works with plans that have a single root.");
+ ScanFieldDeterminer sfd = new ScanFieldDeterminer();
+ ops.iterator().next().accept(sfd, new FieldList());
+ return sfd.scanFields;
+ }
+
+ private ScanFieldDeterminer(){
+ }
+
+ public static class FieldList {
+ private Set<SchemaPath> projected = Sets.newHashSet();
+ private Set<SchemaPath> referenced = Sets.newHashSet();
+
+ public void addProjected(SchemaPath path) {
+ projected.add(path);
+ }
+
+ public void addReferenced(SchemaPath path) {
+ referenced.add(path);
+ }
+
+ public void addReferenced(Collection<SchemaPath> paths) {
+ referenced.addAll(paths);
+ }
+
+ public void addProjected(Collection<SchemaPath> paths) {
+ projected.addAll(paths);
+ }
+
+ public FieldList clone() {
+ FieldList newList = new FieldList();
+ for (SchemaPath p : projected) {
+ newList.addProjected(p);
+ }
+ for (SchemaPath p : referenced) {
+ newList.addReferenced(p);
+ }
+ return newList;
+ }
+ }
+
+ @Override
+ public Void visitScan(Scan scan, FieldList value) {
+ if (value == null) {
+ scanFields.put(scan, new FieldList());
+ } else {
+ scanFields.put(scan, value);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitStore(Store store, FieldList value) {
+ store.getInput().accept(this, value);
+ return null;
+ }
+
+ @Override
+ public Void visitGroupingAggregate(GroupingAggregate groupBy, FieldList value) {
+ FieldList list = new FieldList();
+ for (NamedExpression e : groupBy.getExprs()) {
+ list.addProjected(e.getExpr().accept(finder, null));
+ }
+ for (NamedExpression e : groupBy.getKeys()) {
+ list.addProjected(e.getExpr().accept(finder, null));
+ }
+ groupBy.getInput().accept(this, list);
+ return null;
+ }
+
+ @Override
+ public Void visitFilter(Filter filter, FieldList value) {
+ value.addReferenced(filter.getExpr().accept(finder, null));
+ return null;
+ }
+
+ @Override
+ public Void visitProject(Project project, FieldList value) {
+ FieldList fl = new FieldList();
+ for (NamedExpression e : project.getSelections()) {
+ fl.addProjected(e.getExpr().accept(finder, null));
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitConstant(Constant constant, FieldList value) {
+ return null;
+ }
+
+ @Override
+ public Void visitOrder(Order order, FieldList fl) {
+ for (Ordering o : order.getOrderings()) {
+ fl.addReferenced(o.getExpr().accept(finder, null));
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitJoin(Join join, FieldList fl) {
+ {
+ FieldList leftList = fl.clone();
+ for (JoinCondition c : join.getConditions()) {
+ leftList.addReferenced(c.getLeft().accept(finder, null));
+ }
+ join.getLeft().accept(this, leftList);
+ }
+
+ {
+ FieldList rightList = fl.clone();
+ for (JoinCondition c : join.getConditions()) {
+ rightList.addReferenced(c.getRight().accept(finder, null));
+ }
+ join.getLeft().accept(this, rightList);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitLimit(Limit limit, FieldList value) {
+ limit.getInput().accept(this, value);
+ return null;
+ }
+
+ @Override
+ public Void visitUnion(Union union, FieldList value) {
+ for (LogicalOperator o : union.getInputs()) {
+ o.accept(this, value.clone());
+ }
+ return null;
+ }
+
+
+ /**
+ * Search through a LogicalExpression, finding all internal schema path references and returning them in a set.
+ */
+ private class FieldReferenceFinder extends AbstractExprVisitor<Set<SchemaPath>, Void, RuntimeException> {
+
+ @Override
+ public Set<SchemaPath> visitSchemaPath(SchemaPath path, Void value) {
+ Set<SchemaPath> set = Sets.newHashSet();
+ set.add(path);
+ return set;
+ }
+
+ @Override
+ public Set<SchemaPath> visitUnknown(LogicalExpression e, Void value) {
+ Set<SchemaPath> paths = Sets.newHashSet();
+ for (LogicalExpression ex : e) {
+ paths.addAll(ex.accept(this, null));
+ }
+ return paths;
+ }
+
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
new file mode 100644
index 000000000..a020ab392
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+public class StorageEngines implements Iterable<Map.Entry<String, StorageEngineConfig>>{
+
+ private Map<String, StorageEngineConfig> storage;
+
+ @JsonCreator
+ public StorageEngines(@JsonProperty("storage") Map<String, StorageEngineConfig> storage){
+ this.storage = storage;
+ }
+
+ public static void main(String[] args) throws Exception{
+ DrillConfig config = DrillConfig.create();
+ String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StorageEngines se = config.getMapper().readValue(data, StorageEngines.class);
+ System.out.println(se);
+ }
+
+ @Override
+ public String toString() {
+ final int maxLen = 10;
+ return "StorageEngines [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]";
+ }
+
+ @Override
+ public Iterator<Entry<String, StorageEngineConfig>> iterator() {
+ return storage.entrySet().iterator();
+ }
+
+ private String toString(Collection<?> collection, int maxLen) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ int i = 0;
+ for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
+ if (i > 0)
+ builder.append(", ");
+ builder.append(iterator.next());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java
new file mode 100644
index 000000000..23d03b81c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import net.hydromatic.linq4j.function.Function1;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.drill.exec.store.SchemaProviderRegistry;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DrillSchemaFactory implements Function1<SchemaPlus, Schema>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSchemaFactory.class);
+
+ private final SchemaProviderRegistry registry;
+ private final Map<String, StorageEngineEntry> preEntries = Maps.newHashMap();
+
+ public static DrillSchemaFactory createEmpty(){
+ return new DrillSchemaFactory();
+ }
+
+ private DrillSchemaFactory(){
+ this.registry = null;
+ }
+
+ public DrillSchemaFactory(StorageEngines engines, DrillConfig config) throws SetupException {
+ super();
+ this.registry = new SchemaProviderRegistry(config);
+
+ for (Map.Entry<String, StorageEngineConfig> entry : engines) {
+ SchemaProvider provider = registry.getSchemaProvider(entry.getValue());
+ preEntries.put(entry.getKey(), new StorageEngineEntry(entry.getValue(), provider));
+ }
+
+ }
+
+ public Schema apply(SchemaPlus root) {
+ List<String> schemaNames = Lists.newArrayList();
+ Schema defaultSchema = null;
+ for(Entry<String, StorageEngineEntry> e : preEntries.entrySet()){
+ FileSystemSchema schema = new FileSystemSchema(e.getValue().getConfig(), e.getValue().getProvider(), root, e.getKey());
+ if(defaultSchema == null) defaultSchema = schema;
+ root.add(schema);
+ schemaNames.add(e.getKey());
+ }
+ logger.debug("Registered schemas for {}", schemaNames);
+ return defaultSchema;
+ }
+
+
+ private class StorageEngineEntry{
+ StorageEngineConfig config;
+ SchemaProvider provider;
+
+
+ public StorageEngineEntry(StorageEngineConfig config, SchemaProvider provider) {
+ super();
+ this.config = config;
+ this.provider = provider;
+ }
+
+ public StorageEngineConfig getConfig() {
+ return config;
+ }
+ public SchemaProvider getProvider() {
+ return provider;
+ }
+
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
new file mode 100644
index 000000000..818e892c9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import net.hydromatic.optiq.jdbc.ConnectionConfig;
+import net.hydromatic.optiq.tools.Frameworks;
+import net.hydromatic.optiq.tools.Planner;
+import net.hydromatic.optiq.tools.RelConversionException;
+import net.hydromatic.optiq.tools.RuleSet;
+import net.hydromatic.optiq.tools.ValidationException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.FunctionRegistry;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.exec.planner.logical.DrillImplementor;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.planner.logical.DrillScreenRel;
+import org.apache.drill.exec.planner.logical.DrillStoreRel;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.sql.SqlExplain;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.SqlLiteral;
+import org.eigenbase.sql.SqlNode;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+import org.eigenbase.sql.parser.SqlParseException;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+public class DrillSqlWorker {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+
+ private final FunctionRegistry registry;
+ private final Planner planner;
+
+ public DrillSqlWorker(DrillSchemaFactory schemaFactory, FunctionRegistry functionRegistry) throws Exception {
+ this.registry = functionRegistry;
+ this.planner = Frameworks.getPlanner(ConnectionConfig.Lex.MYSQL, schemaFactory, SqlStdOperatorTable.instance(), new RuleSet[]{DrillRuleSets.DRILL_BASIC_RULES});
+
+ }
+
+
+ public LogicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException{
+ SqlNode sqlNode = planner.parse(sql);
+
+ ResultMode resultMode = ResultMode.EXEC;
+ if(sqlNode.getKind() == SqlKind.EXPLAIN){
+ SqlExplain explain = (SqlExplain) sqlNode;
+ sqlNode = explain.operands[0];
+ SqlLiteral op = (SqlLiteral) explain.operands[2];
+ SqlExplain.Depth depth = (SqlExplain.Depth) op.getValue();
+ switch(depth){
+ case Logical:
+ resultMode = ResultMode.LOGICAL;
+ break;
+ case Physical:
+ resultMode = ResultMode.PHYSICAL;
+ break;
+ default:
+ }
+
+
+ }
+
+ SqlNode validatedNode = planner.validate(sqlNode);
+ RelNode relNode = planner.convert(validatedNode);
+ RelNode convertedRelNode = planner.transform(0, planner.getEmptyTraitSet().plus(DrillRel.CONVENTION), relNode);
+ if(convertedRelNode instanceof DrillStoreRel){
+ throw new UnsupportedOperationException();
+ }else{
+ convertedRelNode = new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), convertedRelNode);
+ }
+ DrillImplementor implementor = new DrillImplementor(new DrillParseContext(registry), resultMode);
+ implementor.go( (DrillRel) convertedRelNode);
+ planner.close();
+ planner.reset();
+ return implementor.getPlan();
+
+ }
+ private void x() throws Exception {
+ String sqlAgg = "select a, count(1) from parquet.`/Users/jnadeau/region.parquet` group by a";
+ String sql = "select * from parquet.`/Users/jnadeau/region.parquet`";
+
+
+ System.out.println(sql);
+ System.out.println(getPlan(sql).toJsonString(DrillConfig.create()));
+ System.out.println("///////////");
+ System.out.println(sqlAgg);
+ System.out.println(getPlan(sqlAgg).toJsonString(DrillConfig.create()));
+ }
+
+ public static void main(String[] args) throws Exception {
+ DrillConfig config = DrillConfig.create();
+
+ String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StorageEngines engines = config.getMapper().readValue(enginesData, StorageEngines.class);
+ FunctionRegistry fr = new FunctionRegistry(config);
+ DrillSchemaFactory schemaFactory = new DrillSchemaFactory(engines, config);
+ DrillSqlWorker worker = new DrillSqlWorker(schemaFactory, fr);
+ worker.x();
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java
new file mode 100644
index 000000000..e348bc331
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A special type of concurrent map which attempts to create an object before returning that it does not exist. It will also provide the same functionality on it's keyset.
+ * @param <KEY> The key in the map.
+ * @param <VALUE> The value in the map.
+ */
+public class ExpandingConcurrentMap<KEY, VALUE> implements ConcurrentMap<KEY, VALUE> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpandingConcurrentMap.class);
+
+ private final ConcurrentMap<KEY, VALUE> internalMap = Maps.newConcurrentMap();
+ private final DelegatingKeySet keySet = new DelegatingKeySet();
+ private final MapValueFactory<KEY, VALUE> fac;
+
+ /**
+ * Create a new ExpandingConcurrentMap.
+ * @param fac The object factory responsible for attempting to generate object instances.
+ */
+ public ExpandingConcurrentMap(MapValueFactory<KEY, VALUE> fac) {
+ super();
+ this.fac = fac;
+ }
+
+ @Override
+ public int size() {
+ return internalMap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return internalMap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object k) {
+ @SuppressWarnings("unchecked") KEY key = (KEY) k;
+
+ if(internalMap.containsKey(key)) return true;
+ VALUE v = getNewEntry(k);
+ return v != null;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VALUE get(Object key) {
+ VALUE out = internalMap.get(key);
+ if(out != null) return out;
+ return getNewEntry(key);
+ }
+
+ private VALUE getNewEntry(Object k){
+ @SuppressWarnings("unchecked")
+ KEY key = (KEY) k;
+ VALUE v = this.fac.create(key);
+ if(v == null) return null;
+ VALUE old = internalMap.putIfAbsent(key, v);
+ if(old == null) return v;
+ fac.destroy(v);
+ return old;
+ }
+
+ @Override
+ public VALUE put(KEY key, VALUE value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VALUE remove(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putAll(Map<? extends KEY, ? extends VALUE> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<KEY> keySet() {
+ return this.keySet;
+ }
+
+ @Override
+ public Collection<VALUE> values() {
+ return internalMap.values();
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<KEY, VALUE>> entrySet() {
+ return internalMap.entrySet();
+ }
+
+ @Override
+ public VALUE putIfAbsent(KEY key, VALUE value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ return false;
+ }
+
+ @Override
+ public boolean replace(KEY key, VALUE oldValue, VALUE newValue) {
+ return false;
+ }
+
+ @Override
+ public VALUE replace(KEY key, VALUE value) {
+ return null;
+ }
+
+ private class DelegatingKeySet implements Set<KEY>{
+
+ @Override
+ public int size() {
+ return ExpandingConcurrentMap.this.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return ExpandingConcurrentMap.this.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return ExpandingConcurrentMap.this.containsKey(o);
+ }
+
+ @Override
+ public Iterator<KEY> iterator() {
+ return internalMap.keySet().iterator();
+ }
+
+ @Override
+ public Object[] toArray() {
+ return internalMap.keySet().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return internalMap.keySet().toArray(a);
+ }
+
+ @Override
+ public boolean add(KEY e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for(Object o : c){
+ if(this.contains(o)) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends KEY> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public interface MapValueFactory<KEY, VALUE> {
+
+ public VALUE create(KEY key);
+ public void destroy(VALUE value);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java
new file mode 100644
index 000000000..ac29fc3ef
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.TableFunction;
+
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaProvider;
+
+public class FileSystemSchema implements Schema, ExpandingConcurrentMap.MapValueFactory<String, DrillTable>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchema.class);
+
+ private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(this);
+
+ private final SchemaPlus parentSchema;
+ private final String name;
+ private final Expression expression = new DefaultExpression(Object.class);
+ private final SchemaProvider schemaProvider;
+ private final StorageEngineConfig config;
+
+ public FileSystemSchema(StorageEngineConfig config, SchemaProvider schemaProvider, SchemaPlus parentSchema, String name) {
+ super();
+ this.parentSchema = parentSchema;
+ this.name = name;
+ this.schemaProvider = schemaProvider;
+ this.config = config;
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Expression getExpression() {
+ return expression;
+ }
+
+ @Override
+ public Collection<TableFunction> getTableFunctions(String name) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public SchemaPlus getParentSchema() {
+ return parentSchema;
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return tables.keySet();
+ }
+
+ @Override
+ public Set<String> getTableFunctionNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean isMutable() {
+ return true;
+ }
+
+ @Override
+ public DrillTable getTable(String name) {
+ return tables.get(name);
+ }
+
+ @Override
+ public DrillTable create(String key) {
+ Object selection = schemaProvider.getSelectionBaseOnName(key);
+ if(selection == null) return null;
+
+ return new DrillTable(name, this.name, selection, config);
+ }
+
+ @Override
+ public void destroy(DrillTable value) {
+ }
+
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
new file mode 100644
index 000000000..d4aabb4a1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.torel;
+
+import java.util.List;
+import java.util.Map;
+
+import net.hydromatic.optiq.prepare.Prepare;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillFilterRel;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillLimitRel;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillSortRel;
+import org.apache.drill.exec.planner.logical.DrillUnionRel;
+import org.apache.drill.exec.planner.logical.ScanFieldDeterminer;
+import org.apache.drill.exec.planner.logical.ScanFieldDeterminer.FieldList;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelOptTable.ToRelContext;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexNode;
+
+public class ConversionContext implements ToRelContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConversionContext.class);
+
+ private static final ConverterVisitor VISITOR = new ConverterVisitor();
+
+ private final Map<Scan, FieldList> scanFieldLists;
+ private final RelOptCluster cluster;
+ private final Prepare prepare;
+
+ public ConversionContext(RelOptCluster cluster, LogicalPlan plan) {
+ super();
+ scanFieldLists = ScanFieldDeterminer.getFieldLists(plan);
+ this.cluster = cluster;
+ this.prepare = null;
+ }
+
+ @Override
+ public RelOptCluster getCluster() {
+ return cluster;
+ }
+
+
+ private FieldList getFieldList(Scan scan) {
+ assert scanFieldLists.containsKey(scan);
+ return scanFieldLists.get(scan);
+ }
+
+
+ public RexBuilder getRexBuilder(){
+ return cluster.getRexBuilder();
+ }
+
+ public RelTraitSet getLogicalTraits(){
+ RelTraitSet set = RelTraitSet.createEmpty();
+ set.add(DrillRel.CONVENTION);
+ return set;
+ }
+
+ public RelNode toRel(LogicalOperator operator) throws InvalidRelException{
+ return operator.accept(VISITOR, this);
+ }
+
+ public RexNode toRex(LogicalExpression e){
+ return null;
+ }
+
+ public RelDataTypeFactory getTypeFactory(){
+ return cluster.getTypeFactory();
+ }
+
+ public RelOptTable getTable(Scan scan){
+ FieldList list = getFieldList(scan);
+
+ return null;
+ }
+
+ @Override
+ public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class ConverterVisitor extends AbstractLogicalVisitor<RelNode, ConversionContext, InvalidRelException>{
+
+ @Override
+ public RelNode visitScan(Scan scan, ConversionContext context){
+ return DrillScanRel.convert(scan, context);
+ }
+
+ @Override
+ public RelNode visitFilter(Filter filter, ConversionContext context) throws InvalidRelException{
+ return DrillFilterRel.convert(filter, context);
+ }
+
+ @Override
+ public RelNode visitProject(Project project, ConversionContext context) throws InvalidRelException{
+ return DrillProjectRel.convert(project, context);
+ }
+
+ @Override
+ public RelNode visitOrder(Order order, ConversionContext context) throws InvalidRelException{
+ return DrillSortRel.convert(order, context);
+ }
+
+ @Override
+ public RelNode visitJoin(Join join, ConversionContext context) throws InvalidRelException{
+ return DrillJoinRel.convert(join, context);
+ }
+
+ @Override
+ public RelNode visitLimit(Limit limit, ConversionContext context) throws InvalidRelException{
+ return DrillLimitRel.convert(limit, context);
+ }
+
+ @Override
+ public RelNode visitUnion(Union union, ConversionContext context) throws InvalidRelException{
+ return DrillUnionRel.convert(union, context);
+ }
+
+ @Override
+ public RelNode visitGroupingAggregate(GroupingAggregate groupBy, ConversionContext context)
+ throws InvalidRelException {
+ return DrillAggregateRel.convert(groupBy, context);
+ }
+
+ }
+
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 991ade6fe..ae21a3cde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -36,6 +36,15 @@ public class BatchSchema implements Iterable<MaterializedField> {
return new SchemaBuilder();
}
+ public int getFieldCount(){
+ return fields.size();
+ }
+
+ public MaterializedField getColumn(int index){
+ if(index < 0 || index >= fields.size()) return null;
+ return fields.get(index);
+ }
+
@Override
public Iterator<MaterializedField> iterator() {
return fields.iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 8d952e32f..6e764a886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -138,17 +138,18 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
@Override
- @SuppressWarnings("unchecked")
public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
VectorWrapper<?> va = wrappers.get(fieldId);
- assert va != null;
- if (va.getVectorClass() != clazz) {
+ if(va!= null && clazz == null){
+ return (VectorWrapper<?>) va;
+ }
+ if (va != null && va.getVectorClass() != clazz) {
logger.warn(String.format(
"Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
return null;
}
- return (VectorWrapper) va;
+ return (VectorWrapper<?>) va;
}
public BatchSchema getSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 4a43a9948..1d9fecfd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -104,7 +104,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
- SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
}
@@ -201,6 +201,10 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
+ public void setAutoRead(boolean enableAutoRead){
+ connection.setAutoRead(enableAutoRead);
+ }
+
public void close() {
logger.debug("Closing client");
connection.getChannel().close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index a0b5cfd65..c665949a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
@@ -48,10 +49,10 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
@Override
protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- return handle(rpcType, pBody, dBody);
+ return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody);
}
- protected abstract Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
+ protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
@Override
public ServerConnection initRemoteConnection(Channel channel) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 0eaade8f8..a19f8d833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -22,8 +22,9 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-public abstract class RemoteConnection{
+public abstract class RemoteConnection implements ConnectionThrottle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
private final WriteManager writeManager;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
new file mode 100644
index 000000000..e2ffcc0bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+public interface ConnectionThrottle {
+ public void setAutoRead(boolean enableAutoRead);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5b4a504da..5345b31a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -51,7 +51,7 @@ public class QueryResultHandler {
return new SubmissionListener(listener);
}
- public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf dBody) throws RpcException {
final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
final QueryResultBatch batch = new QueryResultBatch(result, dBody);
UserResultsListener l = resultsListener.get(result.getQueryId());
@@ -72,7 +72,7 @@ public class QueryResultHandler {
l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
resultsListener.remove(result.getQueryId(), l);
}else{
- l.resultArrived(batch);
+ l.resultArrived(batch, throttle);
}
if (
@@ -100,13 +100,13 @@ public class QueryResultHandler {
private volatile boolean finished = false;
private volatile RpcException ex;
private volatile UserResultsListener output;
-
+ private volatile ConnectionThrottle throttle;
public boolean transferTo(UserResultsListener l) {
synchronized (this) {
output = l;
boolean last = false;
for (QueryResultBatch r : results) {
- l.resultArrived(r);
+ l.resultArrived(r, throttle);
last = r.getHeader().getIsLastChunk();
}
if(ex != null){
@@ -119,14 +119,15 @@ public class QueryResultHandler {
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+ this.throttle = throttle;
if(result.getHeader().getIsLastChunk()) finished = true;
synchronized (this) {
if (output == null) {
this.results.add(result);
} else {
- output.resultArrived(result);
+ output.resultArrived(result, throttle);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index f28ff4b81..8b1bdecd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -70,10 +70,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
- protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ protected Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
switch (rpcType) {
case RpcType.QUERY_RESULT_VALUE:
- queryResultHandler.batchArrived(pBody, dBody);
+ queryResultHandler.batchArrived(throttle, pBody, dBody);
return new Response(RpcType.ACK, Ack.getDefaultInstance());
default:
throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3bcd0cf4b..71f59948e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,6 +24,6 @@ public interface UserResultsListener {
public abstract void queryIdArrived(QueryId queryId);
public abstract void submissionFailed(RpcException ex);
- public abstract void resultArrived(QueryResultBatch result);
+ public abstract void resultArrived(QueryResultBatch result, ConnectionThrottle throttle);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 0f45dbee7..aba9ab702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -94,7 +94,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
default:
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 39821e39f..60a7d5c83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -23,12 +23,16 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FunctionRegistry;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.planner.sql.DrillSchemaFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -37,7 +41,9 @@ import org.apache.drill.exec.store.StorageEngine;
import org.apache.drill.exec.store.StorageEngineRegistry;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.io.Resources;
public class DrillbitContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -53,6 +59,9 @@ public class DrillbitContext {
private final OperatorCreatorRegistry operatorCreatorRegistry;
private final Controller controller;
private final WorkEventBus workBus;
+ private final FunctionImplementationRegistry functionRegistry;
+ private final FunctionRegistry functionRegistryX;
+ private final DrillSchemaFactory factory;
public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
super();
@@ -70,8 +79,34 @@ public class DrillbitContext {
this.storageEngineRegistry = new StorageEngineRegistry(this);
this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry);
this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
+ this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+
+ DrillSchemaFactory factory = null;
+ try{
+ String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StorageEngines engines = context.getConfig().getMapper().readValue(enginesData, StorageEngines.class);
+ factory = new DrillSchemaFactory(engines, context.getConfig());
+ }catch(Exception e){
+ logger.error("Failure reading storage engines data. Creating empty list of schemas.", e);
+ factory = DrillSchemaFactory.createEmpty();
+ }
+ this.factory = factory;
+ this.functionRegistryX = new FunctionRegistry(context.getConfig());
+
}
+ public DrillSchemaFactory getSchemaFactory(){
+ return factory;
+ }
+
+ public FunctionRegistry getFunctionRegistry(){
+ return functionRegistryX;
+ }
+
+ public FunctionImplementationRegistry getFunctionImplementationRegistry() {
+ return functionRegistry;
+ }
+
public WorkEventBus getWorkBus(){
return workBus;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 1756d9673..cb05273b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -116,7 +116,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
calculateEndpointBytes();
}
- public ParquetGroupScan(ArrayList<ReadEntryWithPath> entries,
+ public ParquetGroupScan(List<ReadEntryWithPath> entries,
ParquetStorageEngine storageEngine, FieldReference ref) throws IOException {
this.storageEngine = storageEngine;
this.engineConfig = storageEngine.getEngineConfig();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
index 86be49e32..c17a9e356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
@@ -52,7 +52,7 @@ public class ParquetSchemaProvider implements SchemaProvider{
@Override
public Object getSelectionBaseOnName(String tableName) {
try{
-// if(!fs.exists(new Path(tableName))) return null;
+ if(!fs.exists(new Path(tableName))) return null;
ReadEntryWithPath re = new ReadEntryWithPath(tableName);
return Lists.newArrayList(re);
}catch(Exception e){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
index e04d3e969..45b9cc1b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -99,7 +99,7 @@ public class ParquetStorageEngine extends AbstractStorageEngine{
ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
new TypeReference<ArrayList<ReadEntryWithPath>>() {});
- return new ParquetGroupScan(readEntries, this, scan.getOutputReference());
+ return new ParquetGroupScan( Lists.newArrayList(readEntries), this, scan.getOutputReference());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index cbc8b86c7..4f8cd3373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -167,6 +167,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
return ((data.getByte((int) Math.floor(index / 8)) & (int) Math.pow(2, (index % 8))) == 0) ? 0 : 1;
}
+ public boolean isNull(int index){
+ return false;
+ }
+
@Override
public final Object getObject(int index) {
return new Boolean(get(index) != 0);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 4e5364a91..a3d3a8a9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -125,6 +125,8 @@ public interface ValueVector extends Closeable {
public abstract Object getObject(int index);
public int getValueCount();
+
+ public boolean isNull(int index);
public void reset();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java
new file mode 100644
index 000000000..93089e7b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+abstract class AbstractSqlAccessor implements SqlAccessor {
+
+ @Override
+ public abstract boolean isNull(int index);
+
+ @Override
+ public BigDecimal getBigDecimal(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("BigDecimal");
+ }
+
+ @Override
+ public boolean getBoolean(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("boolean");
+ }
+
+ @Override
+ public byte getByte(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("byte");
+ }
+
+ @Override
+ public byte[] getBytes(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("byte[]");
+ }
+
+ @Override
+ public Date getDate(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Date");
+ }
+
+ @Override
+ public double getDouble(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("double");
+ }
+
+ @Override
+ public float getFloat(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("float");
+ }
+
+ @Override
+ public int getInt(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("int");
+ }
+
+ @Override
+ public long getLong(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("long");
+ }
+
+ @Override
+ public short getShort(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("short");
+ }
+
+ @Override
+ public InputStream getStream(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("InputStream");
+ }
+
+ @Override
+ public char getChar(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Char");
+ }
+
+ @Override
+ public Reader getReader(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Reader");
+ }
+
+ @Override
+ public String getString(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("String");
+ }
+
+ @Override
+ public Time getTime(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Time");
+ }
+
+ @Override
+ public Timestamp getTimestamp(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Timestamp");
+ }
+
+ abstract MajorType getType();
+
+
+ public class InvalidAccessException extends SQLException{
+ public InvalidAccessException(String name){
+ super(String.format("Requesting class of type %s for an object of type %s:%s is not allowed.", name, getType().getMinorType().name(), getType().getMode().name()));
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
new file mode 100644
index 000000000..56113540f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import org.apache.drill.exec.vector.accessor.AbstractSqlAccessor.InvalidAccessException;
+
+public interface SqlAccessor {
+
+ public abstract boolean isNull(int index);
+
+ public abstract BigDecimal getBigDecimal(int index) throws InvalidAccessException;
+
+ public abstract boolean getBoolean(int index) throws InvalidAccessException;
+
+ public abstract byte getByte(int index) throws InvalidAccessException;
+
+ public abstract byte[] getBytes(int index) throws InvalidAccessException;
+
+ public abstract Date getDate(int index) throws InvalidAccessException;
+
+ public abstract double getDouble(int index) throws InvalidAccessException;
+
+ public abstract float getFloat(int index) throws InvalidAccessException;
+
+ public abstract char getChar(int index) throws InvalidAccessException;
+
+ public abstract int getInt(int index) throws InvalidAccessException;
+
+ public abstract long getLong(int index) throws InvalidAccessException;
+
+ public abstract short getShort(int index) throws InvalidAccessException;
+
+ public abstract InputStream getStream(int index) throws InvalidAccessException;
+
+ public abstract Reader getReader(int index) throws InvalidAccessException;
+
+ public abstract String getString(int index) throws InvalidAccessException;
+
+ public abstract Time getTime(int index) throws InvalidAccessException;
+
+ public abstract Timestamp getTimestamp(int index) throws InvalidAccessException;
+
+ public abstract Object getObject(int index) throws InvalidAccessException;
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 29f011f62..b6d441cff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -119,6 +119,7 @@ public class WorkManager implements Closeable{
public class WorkerBee{
public void addFragmentRunner(FragmentExecutor runner){
+ logger.debug("Adding pending task {}", runner);
pendingTasks.add(runner);
}
@@ -167,7 +168,7 @@ public class WorkManager implements Closeable{
try {
while(true){
// logger.debug("Polling for pending work tasks.");
- Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
+ Runnable r = pendingTasks.take();
if(r != null){
logger.debug("Starting pending task {}", r);
executor.execute(r);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e4f8e7b9c..329815d20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,43 +17,58 @@
*/
package org.apache.drill.exec.work.foreman;
+import io.netty.buffer.ByteBuf;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.util.AtomicState;
+import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.exec.work.ErrorHelper;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
@@ -151,6 +166,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
+
break;
case PHYSICAL:
parseAndRunPhysicalPlan(queryRequest.getPlan());
@@ -170,8 +186,18 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
try {
LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+
+ if(logicalPlan.getProperties().resultMode == ResultMode.LOGICAL){
+ fail("Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.", new Exception());
+ }
if(logger.isDebugEnabled()) logger.debug("Logical {}", logicalPlan.unparse(context.getConfig()));
PhysicalPlan physicalPlan = convert(logicalPlan);
+
+ if(logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL){
+ returnPhysical(physicalPlan);
+ return;
+ }
+
if(logger.isDebugEnabled()) logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan));
runPhysicalPlan(physicalPlan);
} catch (IOException e) {
@@ -181,6 +207,74 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
+
+ private void returnLogical(LogicalPlan plan){
+ String jsonPlan = plan.toJsonStringSafe(context.getConfig());
+ sendSingleString("logical", jsonPlan);
+ }
+
+ private void returnPhysical(PhysicalPlan plan){
+ String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
+ sendSingleString("physical", jsonPlan);
+ }
+
+ private void sendSingleString(String columnName, String value){
+ MaterializedField f = MaterializedField.create(new SchemaPath(columnName, ExpressionPosition.UNKNOWN), Types.required(MinorType.VARCHAR));
+ VarCharVector vector = new VarCharVector(f, bee.getContext().getAllocator());
+ byte[] bytes = value.getBytes(Charsets.UTF_8);
+ vector.allocateNew(bytes.length, 1);
+ vector.getMutator().set(0, bytes);
+ vector.getMutator().setValueCount(1);
+ QueryResult header = QueryResult.newBuilder() //
+ .setQueryId(context.getQueryId()) //
+ .setRowCount(1) //
+ .setDef(RecordBatchDef.newBuilder().addField(vector.getMetadata()).build()) //
+ .setIsLastChunk(false) //
+ .build();
+ QueryWritableBatch b1 = new QueryWritableBatch(header, vector.getBuffers());
+ vector.close();
+
+ QueryResult header2 = QueryResult.newBuilder() //
+ .setQueryId(context.getQueryId()) //
+ .setRowCount(0) //
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ QueryWritableBatch b2 = new QueryWritableBatch(header2);
+
+ SingleListener l = new SingleListener();
+ this.initiatingClient.sendResult(l, b1);
+ this.initiatingClient.sendResult(l, b2);
+ l.acct.waitForSendComplete();
+
+ }
+
+
+ class SingleListener implements RpcOutcomeListener<Ack>{
+
+ final SendingAccountor acct;
+
+ public SingleListener(){
+ acct = new SendingAccountor();
+ acct.increment();
+ acct.increment();
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ acct.decrement();
+ fail("Failure while sending single result.", ex);
+ }
+
+ @Override
+ public void success(Ack value, ByteBuf buffer) {
+ acct.decrement();
+ }
+
+ }
+
+
+
private void parseAndRunPhysicalPlan(String json) {
try {
PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -192,7 +286,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
private void runPhysicalPlan(PhysicalPlan plan) {
+ if(plan.getProperties().resultMode != ResultMode.EXEC){
+ fail(String.format("Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception());
+ }
PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+
MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
Fragment rootFragment;
try {
@@ -218,18 +316,26 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
List<PlanFragment> intermediateFragments = Lists.newArrayList();
// store fragments in distributed grid.
+ logger.debug("Storing fragments");
for (PlanFragment f : work.getFragments()) {
// store all fragments in grid since they are part of handshake.
+
context.getCache().storeFragment(f);
if (f.getLeafFragment()) {
leafFragments.add(f);
} else {
intermediateFragments.add(f);
}
+
+
}
+ logger.debug("Fragments stored.");
+
+ logger.debug("Submitting fragments to run.");
fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments);
+ logger.debug("Fragments running.");
} catch (ExecutionSetupException | RpcException e) {
@@ -238,11 +344,32 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
- private void runSQL(String json) {
- throw new UnsupportedOperationException();
+ private void runSQL(String sql) {
+ try{
+ DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getSchemaFactory(), context.getFunctionRegistry());
+ LogicalPlan plan = sqlWorker.getPlan(sql);
+
+
+ if(plan.getProperties().resultMode == ResultMode.LOGICAL){
+ returnLogical(plan);
+ return;
+ }
+
+ PhysicalPlan physical = convert(plan);
+
+ if(plan.getProperties().resultMode == ResultMode.PHYSICAL){
+ returnPhysical(physical);
+ return;
+ }
+
+ runPhysicalPlan(physical);
+ }catch(Exception e){
+ fail("Failure while parsing sql.", e);
+ }
}
private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
+ if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index e9302e1fc..eaf921dfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -76,18 +76,25 @@ class QueryManager implements FragmentStatusListener{
}
public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
+ logger.debug("Setting up fragment runs.");
remainingFragmentCount.set(leafFragments.size()+1);
queryId = rootFragment.getHandle().getQueryId();
workBus = bee.getContext().getWorkBus();
// set up the root fragment first so we'll have incoming buffers available.
{
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ logger.debug("Setting up root context.");
+ FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext().getFunctionImplementationRegistry());
+ logger.debug("Setting up incoming buffers");
IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+ logger.debug("Setting buffers on root context.");
rootContext.setBuffers(buffers);
+ logger.debug("Generating Exec tree");
RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+ logger.debug("Exec tree generated.");
// add fragment to local node.
map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+ logger.debug("Fragment added to local node.");
rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment));
RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
@@ -99,6 +106,7 @@ class QueryManager implements FragmentStatusListener{
workBus.setRootFragmentManager(fragmentManager);
}
+
}
// keep track of intermediate fragments (not root or leaf)
@@ -112,6 +120,7 @@ class QueryManager implements FragmentStatusListener{
sendRemoteFragment(f);
}
+ logger.debug("Fragment runs setup is complete.");
}
private void sendRemoteFragment(PlanFragment fragment){
diff --git a/exec/java-exec/src/main/resources/storage-engines.json b/exec/java-exec/src/main/resources/storage-engines.json
new file mode 100644
index 000000000..d1d0413a8
--- /dev/null
+++ b/exec/java-exec/src/main/resources/storage-engines.json
@@ -0,0 +1,29 @@
+{
+ "storage":{
+ "parquet-local" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ },
+ "parquet-cp" :
+ {
+ "type":"parquet",
+ "dfsName" : "classpath:///"
+ },
+ "jsonl" :
+ {
+ "type":"json",
+ "dfsName" : "file:///"
+ },
+ "json-cp" :
+ {
+ "type":"json",
+ "dfsName" : "classpath:///"
+ },
+ "parquet" :
+ {
+ "type":"parquet",
+ "dfsName" : "file:///"
+ }
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index b4dc943ba..454b15f8f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -171,12 +171,7 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
}
ValueVector.Accessor accessor = v.getValueVector().getAccessor();
-
- if (v.getField().getType().getMinorType() == TypeProtos.MinorType.VARCHAR) {
- System.out.println(new String((byte[]) accessor.getObject(r), UTF_8));
- } else {
- System.out.print(accessor.getObject(r));
- }
+ System.out.print(accessor.getObject(r));
}
if (!first) System.out.println();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index bffc42765..2be3e8d7c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -145,10 +145,10 @@ public class JSONRecordReaderTest {
assertEquals(3, addFields.size());
assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
assertField(addFields.get(1), 0, MinorType.BIT, true, "b");
- assertField(addFields.get(2), 0, MinorType.VARCHAR, "hi!".getBytes(UTF_8), "c");
+ assertField(addFields.get(2), 0, MinorType.VARCHAR, "hi!", "c");
assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
assertField(addFields.get(1), 1, MinorType.BIT, false, "b");
- assertField(addFields.get(2), 1, MinorType.VARCHAR, "drill!".getBytes(UTF_8), "c");
+ assertField(addFields.get(2), 1, MinorType.VARCHAR, "drill!", "c");
assertEquals(0, jr.next());
assertTrue(mutator.getRemovedFields().isEmpty());
@@ -178,19 +178,19 @@ public class JSONRecordReaderTest {
assertField(addFields.get(1), 0, MinorType.INT, 1, "b");
assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 2.15, "c");
assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
- assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1".getBytes(UTF_8), "str1");
+ assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1", "str1");
assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
assertField(addFields.get(1), 1, MinorType.INT, 3, "b");
assertField(addFields.get(3), 1, MinorType.BIT, false, "bool");
- assertField(addFields.get(4), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1");
+ assertField(addFields.get(4), 1, MinorType.VARCHAR, "test2", "str1");
assertField(addFields.get(5), 1, MinorType.INT, 4, "d");
assertField(addFields.get(0), 2, MinorType.INT, 12345, "test");
assertField(addFields.get(2), 2, MinorType.FLOAT4, (float) 5.16, "c");
assertField(addFields.get(3), 2, MinorType.BIT, true, "bool");
assertField(addFields.get(5), 2, MinorType.INT, 6, "d");
- assertField(addFields.get(6), 2, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
+ assertField(addFields.get(6), 2, MinorType.VARCHAR, "test3", "str2");
assertTrue(mutator.getRemovedFields().isEmpty());
assertEquals(0, jr.next());
}
@@ -220,14 +220,14 @@ public class JSONRecordReaderTest {
assertField(addFields.get(1), 0, MinorType.INT, 1, "b");
assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 2.15, "c");
assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
- assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1".getBytes(UTF_8), "str1");
+ assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1", "str1");
assertTrue(removedFields.isEmpty());
assertEquals(1, jr.next());
assertEquals(6, addFields.size());
assertField(addFields.get(0), 0, MinorType.INT, 1234, "test");
assertField(addFields.get(1), 0, MinorType.INT, 3, "b");
assertField(addFields.get(3), 0, MinorType.BIT, false, "bool");
- assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1");
+ assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2", "str1");
assertField(addFields.get(5), 0, MinorType.INT, 4, "d");
assertEquals(1, removedFields.size());
assertEquals("c", removedFields.get(0).getName());
@@ -238,7 +238,7 @@ public class JSONRecordReaderTest {
assertField(addFields.get(3), 0, MinorType.BIT, true, "bool");
assertField(addFields.get(5), 0, MinorType.INT, 6, "d");
assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 5.16, "c");
- assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2");
+ assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3", "str2");
assertEquals(2, removedFields.size());
Iterables.find(removedFields, new Predicate<MaterializedField>() {
@Override
@@ -274,10 +274,10 @@ public class JSONRecordReaderTest {
assertEquals(2, jr.next());
assertEquals(3, addFields.size());
assertField(addFields.get(0), 0, MinorType.INT, 123, "test");
- assertField(addFields.get(1), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "a.b");
+ assertField(addFields.get(1), 0, MinorType.VARCHAR, "test", "a.b");
assertField(addFields.get(2), 0, MinorType.BIT, true, "a.a.d");
assertField(addFields.get(0), 1, MinorType.INT, 1234, "test");
- assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "a.b");
+ assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2", "a.b");
assertField(addFields.get(2), 1, MinorType.BIT, false, "a.a.d");
assertEquals(0, jr.next());
@@ -308,7 +308,7 @@ public class JSONRecordReaderTest {
assertField(addFields.get(3), 0, MinorType.INT, Arrays.asList(7, 8, 9), "test3.b");
assertField(addFields.get(4), 0, MinorType.INT, Arrays.asList(10, 11, 12), "test3.c.d");
assertField(addFields.get(5), 0, MinorType.FLOAT4, Arrays.<Float>asList((float) 1.1, (float) 1.2, (float) 1.3), "testFloat");
- assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello".getBytes(UTF_8), "drill".getBytes(UTF_8)), "testStr");
+ assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello", "drill"), "testStr");
assertField(addFields.get(1), 1, MinorType.INT, Arrays.asList(1, 2), "test2");
assertField(addFields.get(2), 1, MinorType.INT, Arrays.asList(7, 7, 7, 8), "test3.a");
assertField(addFields.get(5), 1, MinorType.FLOAT4, Arrays.<Float>asList((float) 2.2, (float) 2.3,(float) 2.4), "testFloat");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
index 733cb1dd3..8efc7622d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java
@@ -17,10 +17,13 @@
*/
package org.apache.drill.exec.store;
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import com.google.common.util.concurrent.SettableFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.column.Encoding.PLAIN;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.types.TypeProtos;
@@ -34,6 +37,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
@@ -54,10 +58,10 @@ import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.MessageType;
import parquet.schema.MessageTypeParser;
-import java.util.*;
-
-import static org.junit.Assert.*;
-import static parquet.column.Encoding.PLAIN;
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.SettableFuture;
public class ParquetRecordReaderTest {
org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
@@ -368,7 +372,7 @@ public class ParquetRecordReaderTest {
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
logger.debug("result arrived in test batch listener.");
if(result.getHeader().getIsLastChunk()){
future.set(null);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
index b765ed00d..9c4aeea14 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
@@ -17,9 +17,10 @@
*/
package org.apache.drill.exec.store;
-import com.google.common.base.Charsets;
-import com.google.common.base.Stopwatch;
-import com.google.common.io.Resources;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.client.DrillClient;
@@ -28,6 +29,7 @@ import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
@@ -37,10 +39,9 @@ import org.junit.AfterClass;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
+import com.google.common.io.Resources;
public class TestParquetPhysicalPlan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class);
@@ -93,7 +94,7 @@ public class TestParquetPhysicalPlan {
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount();
System.out.println(String.format("Result batch arrived. Number of records: %d", rows));
count.addAndGet(rows);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index ab29a9f38..52430e132 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
@@ -139,7 +140,7 @@ public class ParquetRecordReaderTest {
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
long columnValCounter = 0;
int i = 0;
FieldInfo currentField;