aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2014-01-30 10:56:27 -0800
committerJacques Nadeau <jacques@apache.org>2014-03-03 23:22:16 -0800
commitb3460af8f3d47ce3a1ac4289998d255a9307b1ea (patch)
tree6edf5228cb23216311b9703a78b3cd0674eea35d /exec/java-exec/src/main/java/org
parentcdf46fd36fdfc2e3029a6b2e077330c665e43c2e (diff)
DRILL-257: Move SQL parsing to server side.
Switch to Avatica based JDBC driver. Update QuerySubmitter to support SQL queries. Update SqlAccesors to support getObject() Remove ref, clean up SQL packages some. Various performance fixes. Updating result set so first set of results must be returned before control is return to client to allow metadata to populate for aggressive tools like sqlline Move timeout functionality to TestTools. Update Expression materializer so that it will return a nullable int if a field is not found. Update Project record batch to support simple wildcard queries. Updates to move JSON record reader test to expecting VarCharVector.getObject to return a String rather than a byte[].
Diffstat (limited to 'exec/java-exec/src/main/java/org')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java112
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java108
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java165
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java59
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java200
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java98
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java92
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java100
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java107
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java41
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java142
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java217
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java74
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java96
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java123
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java223
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java117
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java167
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java123
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java67
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java131
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java11
79 files changed, 3893 insertions, 207 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 2f0ab2ea4..5f370ff34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -35,6 +35,7 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.DuplicateInstanceNameException;
import com.hazelcast.core.Hazelcast;
@@ -132,6 +133,10 @@ public class HazelCache implements DistributedCache {
@Override
public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
IMap<String, V> imap = this.instance.getMap(clazz.toString());
+ MapConfig myMapConfig = new MapConfig();
+ myMapConfig.setBackupCount(0);
+ myMapConfig.setReadBackupData(true);
+ instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
return new HCDistributedMapImpl<V>(imap, clazz);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 74eed9424..1297cb343 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.Closeable;
@@ -37,11 +35,18 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.QueryType;
-import org.apache.drill.exec.rpc.*;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.ChannelClosedException;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
@@ -52,7 +57,7 @@ import com.google.common.util.concurrent.SettableFuture;
/**
* Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf
*/
-public class DrillClient implements Closeable{
+public class DrillClient implements Closeable, ConnectionThrottle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
DrillConfig config;
@@ -86,17 +91,28 @@ public class DrillClient implements Closeable{
return config;
}
+ @Override
+ public void setAutoRead(boolean enableAutoRead) {
+ client.setAutoRead(enableAutoRead);
+ }
+
+
+
/**
* Connects the client to a Drillbit server
*
* @throws IOException
*/
- public synchronized void connect() throws RpcException {
+ public void connect() throws RpcException {
+ connect((String) null);
+ }
+
+ public synchronized void connect(String connect) throws RpcException {
if (connected) return;
if (clusterCoordinator == null) {
try {
- this.clusterCoordinator = new ZKClusterCoordinator(this.config);
+ this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect);
this.clusterCoordinator.start(10000);
} catch (Exception e) {
throw new RpcException("Failure setting up ZK for client.", e);
@@ -172,6 +188,11 @@ public class DrillClient implements Closeable{
return listener.getResults();
}
+ public void cancelQuery(QueryId id){
+ client.send(RpcType.CANCEL_QUERY, id, Ack.class);
+ }
+
+
/**
* Submits a Logical plan for direct execution (bypasses parsing)
*
@@ -217,7 +238,7 @@ public class DrillClient implements Closeable{
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
// logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
results.add(result);
if(result.getHeader().getIsLastChunk()){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 7adefdb10..b07b3aee7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -17,43 +17,33 @@
*/
package org.apache.drill.exec.client;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.base.Stopwatch;
-import com.google.common.io.Resources;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.StringUtils;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.RemoteRpcException;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.util.VectorUtil;
-import org.apache.drill.exec.vector.ValueVector;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharsetDecoder;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
public class QuerySubmitter {
@@ -99,42 +89,50 @@ public class QuerySubmitter {
public int submitQuery(String planLocation, String type, String zkQuorum, boolean local, int bits) throws Exception {
DrillConfig config = DrillConfig.create();
- DrillClient client;
- if (local) {
- RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
- Drillbit[] drillbits = new Drillbit[bits];
- for (int i = 0; i < bits; i++) {
- drillbits[i] = new Drillbit(config, serviceSet);
- drillbits[i].run();
+ DrillClient client = null;
+
+ try{
+ if (local) {
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit[] drillbits = new Drillbit[bits];
+ for (int i = 0; i < bits; i++) {
+ drillbits[i] = new Drillbit(config, serviceSet);
+ drillbits[i].run();
+ }
+ client = new DrillClient(config, serviceSet.getCoordinator());
+ } else {
+ ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum);
+ clusterCoordinator.start(10000);
+ client = new DrillClient(config, clusterCoordinator);
}
- client = new DrillClient(config, serviceSet.getCoordinator());
- } else {
- ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum);
- clusterCoordinator.start(10000);
- client = new DrillClient(config, clusterCoordinator);
- }
- client.connect();
- QueryResultsListener listener = new QueryResultsListener();
- String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
- UserProtos.QueryType queryType;
- type = type.toLowerCase();
- switch(type) {
- case "logical":
- queryType = UserProtos.QueryType.LOGICAL;
- break;
- case "physical":
- queryType = UserProtos.QueryType.PHYSICAL;
- break;
- default:
- System.out.println("Invalid query type: " + type);
- return -1;
+ client.connect();
+ QueryResultsListener listener = new QueryResultsListener();
+ String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString();
+ UserProtos.QueryType queryType;
+ type = type.toLowerCase();
+ switch(type) {
+ case "sql":
+ queryType = UserProtos.QueryType.SQL;
+ break;
+ case "logical":
+ queryType = UserProtos.QueryType.LOGICAL;
+ break;
+ case "physical":
+ queryType = UserProtos.QueryType.PHYSICAL;
+ break;
+ default:
+ System.out.println("Invalid query type: " + type);
+ return -1;
+ }
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ client.runQuery(queryType, plan, listener);
+ int rows = listener.await();
+ System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000));
+ return 0;
+ }finally{
+ if(client != null) client.close();
}
- Stopwatch watch = new Stopwatch();
- watch.start();
- client.runQuery(queryType, plan, listener);
- int rows = listener.await();
- System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000));
- return 0;
}
private class QueryResultsListener implements UserResultsListener {
@@ -150,7 +148,7 @@ public class QuerySubmitter {
}
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount();
if (result.getData() != null) {
count.addAndGet(rows);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
index 904afb5c4..080679b87 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
@@ -112,10 +112,7 @@ class MergeAdapter extends ClassVisitor {
return null;
}
if(arg3 != null){
- System.out.println("a: " + arg3);
arg3 = arg3.replace(set.precompiled.slash, set.generated.slash);
- System.out.println("b: " + arg3);
-
}
// if( (access & Modifier.PUBLIC) == 0){
// access = access ^ Modifier.PUBLIC ^ Modifier.PROTECTED | Modifier.PRIVATE;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 36433ad29..cbd722c03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -24,16 +24,18 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.visitors.SimpleExprVisitor;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.record.NullExpression;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
import com.google.common.collect.Lists;
-import org.apache.drill.exec.record.VectorAccessible;
-
public class ExpressionTreeMaterializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
@@ -45,7 +47,12 @@ public class ExpressionTreeMaterializer {
public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry) {
LogicalExpression materializedExpr = expr.accept(new MaterializeVisitor(batch, errorCollector), null);
- return ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry);
+ LogicalExpression out = ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry);
+ if(out instanceof NullExpression){
+ return new TypedNullConstant(Types.optional(MinorType.INT));
+ }else{
+ return out;
+ }
}
private static class MaterializeVisitor extends SimpleExprVisitor<LogicalExpression> {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index fd965a704..80a78191d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import net.hydromatic.optiq.SchemaPlus;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
@@ -75,6 +77,7 @@ public class FragmentContext implements Closeable {
this.connection = connection;
this.fragment = fragment;
this.funcRegistry = funcRegistry;
+ logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
}
@@ -91,6 +94,10 @@ public class FragmentContext implements Closeable {
public DrillbitContext getDrillbitContext() {
return context;
}
+
+ public SchemaPlus getRootSchema(){
+ return null;
+ }
/**
* Get this node's identity.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 256e7729a..11658e63c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -21,9 +21,11 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FunctionRegistry;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.sql.DrillSchemaFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -81,4 +83,13 @@ public class QueryContext {
return workBus;
}
+ public DrillSchemaFactory getSchemaFactory(){
+ return drillbitContext.getSchemaFactory();
+ }
+
+ public FunctionRegistry getFunctionRegistry(){
+ return drillbitContext.getFunctionRegistry();
+
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 6e58ec778..cd59428f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
@@ -33,7 +32,6 @@ import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.common.logical.data.*;
-import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
@@ -50,6 +48,8 @@ import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.store.StorageEngine;
+import org.eigenbase.rel.RelFieldCollation.Direction;
+import org.eigenbase.rel.RelFieldCollation.NullDirection;
import com.beust.jcommander.internal.Lists;
@@ -123,9 +123,9 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException {
PhysicalOperator input = order.getInput().accept(this, value);
- List<OrderDef> ods = Lists.newArrayList();
+ List<Ordering> ods = Lists.newArrayList();
for(Ordering o : order.getOrderings()){
- ods.add(OrderDef.create(o));
+ ods.add(o);
}
return new SelectionVectorRemover(new Sort(input, ods, false));
}
@@ -150,13 +150,13 @@ public class BasicOptimizer extends Optimizer{
}
// a collapsing aggregate is a currently implemented as a sort followed by a streaming aggregate.
- List<OrderDef> orderDefs = Lists.newArrayList();
+ List<Ordering> orderDefs = Lists.newArrayList();
List<NamedExpression> keys = Lists.newArrayList();
for(LogicalExpression e : segment.getExprs()){
if( !(e instanceof SchemaPath)) throw new OptimizerException("The basic optimizer doesn't currently support collapsing aggregate where the segment value is something other than a SchemaPath.");
keys.add(new NamedExpression(e, new FieldReference((SchemaPath) e)));
- orderDefs.add(new OrderDef(Direction.ASC, e));
+ orderDefs.add(new Ordering(Direction.Ascending, e, NullDirection.FIRST));
}
Sort sort = new Sort(segment.getInput().accept(this, value), orderDefs, false);
@@ -169,22 +169,22 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException {
PhysicalOperator leftOp = join.getLeft().accept(this, value);
- List<OrderDef> leftOrderDefs = Lists.newArrayList();
+ List<Ordering> leftOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
- leftOrderDefs.add(new OrderDef(Direction.ASC, jc.getLeft()));
+ leftOrderDefs.add(new Ordering(Direction.Ascending, jc.getLeft()));
}
leftOp = new Sort(leftOp, leftOrderDefs, false);
leftOp = new SelectionVectorRemover(leftOp);
PhysicalOperator rightOp = join.getRight().accept(this, value);
- List<OrderDef> rightOrderDefs = Lists.newArrayList();
+ List<Ordering> rightOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
- rightOrderDefs.add(new OrderDef(Direction.ASC, jc.getRight()));
+ rightOrderDefs.add(new Ordering(Direction.Ascending, jc.getRight()));
}
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
- MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJointType());
+ MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
return new SelectionVectorRemover(mjp);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
index 76916ea44..fde88a9b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java
@@ -21,13 +21,14 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.Join.JoinType;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.sql.SqlJoinOperator.JoinType;
import com.beust.jcommander.internal.Lists;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -44,7 +45,7 @@ public class MergeJoinPOP extends AbstractBase{
private final PhysicalOperator left;
private final PhysicalOperator right;
private final List<JoinCondition> conditions;
- private final Join.JoinType joinType;
+ private final JoinRelType joinType;
@Override
public OperatorCost getCost() {
@@ -56,7 +57,7 @@ public class MergeJoinPOP extends AbstractBase{
@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("join-conditions") List<JoinCondition> conditions,
- @JsonProperty("join-type") Join.JoinType joinType
+ @JsonProperty("join-type") JoinRelType joinType
) {
this.left = left;
this.right = right;
@@ -93,7 +94,7 @@ public class MergeJoinPOP extends AbstractBase{
return right;
}
- public Join.JoinType getJoinType() {
+ public JoinRelType getJoinType() {
return joinType;
}
@@ -102,12 +103,12 @@ public class MergeJoinPOP extends AbstractBase{
}
public MergeJoinPOP flipIfRight(){
- if(joinType == JoinType.RIGHT){
+ if(joinType == JoinRelType.RIGHT){
List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size());
for(JoinCondition c : conditions){
flippedConditions.add(c.flip());
}
- return new MergeJoinPOP(right, left, flippedConditions, JoinType.LEFT);
+ return new MergeJoinPOP(right, left, flippedConditions, JoinRelType.LEFT);
}else{
return this;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 667cc33a4..549c65c1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -17,18 +17,18 @@
*/
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
-import org.apache.drill.common.expression.LogicalExpression;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractReceiver;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
// The goal of this operator is to produce outgoing batches with records
// ordered according to the supplied expression. Each incoming batch
@@ -39,12 +39,12 @@ public class MergingReceiverPOP extends AbstractReceiver{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverPOP.class);
private final List<DrillbitEndpoint> senders;
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
@JsonCreator
public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("senders") List<DrillbitEndpoint> senders,
- @JsonProperty("orderings") List<OrderDef> orderings) {
+ @JsonProperty("orderings") List<Ordering> orderings) {
super(oppositeMajorFragmentId);
this.senders = senders;
this.orderings = orderings;
@@ -78,7 +78,7 @@ public class MergingReceiverPOP extends AbstractReceiver{
return new Size(1,1);
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index f0aa85bdc..c49509f40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -17,27 +17,27 @@
*/
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
@JsonTypeName("ordered-partition-exchange")
public class OrderedPartitionExchange extends AbstractExchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionExchange.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private final FieldReference ref;
private int recordsToSample = 10000; // How many records must be received before analyzing
private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache
@@ -48,7 +48,7 @@ public class OrderedPartitionExchange extends AbstractExchange {
private List<DrillbitEndpoint> receiverLocations;
@JsonCreator
- public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref,
+ public OrderedPartitionExchange(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref,
@JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample,
@JsonProperty("samplingFactor") Integer samplingFactor, @JsonProperty("completionFactor") Float completionFactor) {
super(child);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
index de5cf04ef..55632a214 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java
@@ -17,24 +17,27 @@
*/
package org.apache.drill.exec.physical.config;
-import com.beust.jcommander.internal.Lists;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
-import org.apache.drill.exec.physical.base.*;
-import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import java.util.List;
+import com.beust.jcommander.internal.Lists;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("OrderedPartitionSender")
public class OrderedPartitionSender extends AbstractSender {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionSender.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private final FieldReference ref;
private final List<DrillbitEndpoint> endpoints;
private final int sendingWidth;
@@ -44,7 +47,7 @@ public class OrderedPartitionSender extends AbstractSender {
private float completionFactor;
@JsonCreator
- public OrderedPartitionSender(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
+ public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child,
@JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample,
@JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) {
@@ -70,7 +73,7 @@ public class OrderedPartitionSender extends AbstractSender {
return endpoints;
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index fbeb6dfb6..b8073c175 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -18,25 +18,25 @@
package org.apache.drill.exec.physical.config;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.defs.OrderDef;
+import java.util.List;
+
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
-import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.AbstractExchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.physical.base.Sender;
import org.apache.drill.exec.proto.CoordinationProtos;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("single-merge-exchange")
public class SingleMergeExchange extends AbstractExchange {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleMergeExchange.class);
- private final List<OrderDef> orderExpr;
+ private final List<Ordering> orderExpr;
// ephemeral for setup tasks
private List<CoordinationProtos.DrillbitEndpoint> senderLocations;
@@ -44,7 +44,7 @@ public class SingleMergeExchange extends AbstractExchange {
@JsonCreator
public SingleMergeExchange(@JsonProperty("child") PhysicalOperator child,
- @JsonProperty("orderings") List<OrderDef> orderExpr) {
+ @JsonProperty("orderings") List<Ordering> orderExpr) {
super(child);
this.orderExpr = orderExpr;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
index 206e7cd4a..82aa830cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.config;
import java.util.List;
-import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -27,7 +27,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Size;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -35,17 +34,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class Sort extends AbstractSingle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
- private final List<OrderDef> orderings;
+ private final List<Ordering> orderings;
private boolean reverse = false;
@JsonCreator
- public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
+ public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
super(child);
this.orderings = orderings;
this.reverse = reverse;
}
- public List<OrderDef> getOrderings() {
+ public List<Ordering> getOrderings() {
return orderings;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index aae1a3c21..7c8a51cad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.record.VectorContainer;
+import org.eigenbase.rel.JoinRelType;
/**
* This join template uses a merge join to combine two ordered streams into a single larger batch. When joining
@@ -91,7 +92,7 @@ public abstract class JoinTemplate implements JoinWorker {
// validate input iterators (advancing to the next record batch if necessary)
if (!status.isRightPositionAllowed()) {
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) {
+ if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos()))
@@ -109,7 +110,7 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
- if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT)
+ if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT)
if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) {
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 298b031f8..bd668e748 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -49,6 +49,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.JoinRelType;
import com.google.common.collect.ImmutableList;
import com.sun.codemodel.JClass;
@@ -112,7 +113,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final RecordBatch right;
private final JoinStatus status;
private final JoinCondition condition;
- private final Join.JoinType joinType;
+ private final JoinRelType joinType;
private JoinWorker worker;
public MergeJoinBatchBuilder batchBuilder;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 9428b4650..3549a33bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -17,16 +17,16 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import com.google.common.base.Preconditions;
+import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.logical.data.Join.JoinType;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
+import org.eigenbase.rel.JoinRelType;
-import java.util.List;
+import com.google.common.base.Preconditions;
public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
@@ -34,7 +34,7 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
@Override
public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
- if(config.getJoinType() == JoinType.RIGHT){
+ if(config.getJoinType() == JoinRelType.RIGHT){
return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0));
}else{
return new MergeJoinBatch(config, context, children.get(0), children.get(1));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 43d3b457b..72f1ad93b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -25,14 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Direction;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -60,6 +59,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -367,7 +367,7 @@ public class MergingRecordBatch implements RecordBatch {
private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException {
// set up the expression evaluator and code generation
- final List<OrderDef> orderings = config.getOrderings();
+ final List<Ordering> orderings = config.getOrderings();
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<MergingReceiverGeneratorBase> cg =
CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -550,11 +550,11 @@ public class MergingRecordBatch implements RecordBatch {
// generate less than/greater than checks (fixing results for ASCending vs. DESCending)
cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1)))
._then()
- ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? 1 : -1));
+ ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? 1 : -1));
cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1)))
._then()
- ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? -1 : 1));
+ ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? -1 : 1));
++comparisonVectorIndex;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index da8978f39..381fbe24b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -23,7 +23,6 @@ import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -32,6 +31,7 @@ import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.cache.Counter;
@@ -69,6 +69,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -189,7 +190,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
// Uses the
- // the expressions from the OrderDefs to populate each column. There is one column for each OrderDef in
+ // the expressions from the Orderings to populate each column. There is one column for each Ordering in
// popConfig.orderings.
VectorContainer containerToCache = new VectorContainer();
@@ -293,11 +294,11 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
VectorContainer allSamplesContainer = new VectorContainer();
containerBuilder.build(context, allSamplesContainer);
- List<OrderDef> orderDefs = Lists.newArrayList();
+ List<Ordering> orderDefs = Lists.newArrayList();
int i = 0;
- for (OrderDef od : popConfig.getOrderings()) {
+ for (Ordering od : popConfig.getOrderings()) {
SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
- orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp)));
+ orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
}
// sort the data incoming samples.
@@ -330,8 +331,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
/**
* Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer
- * outgoing. Each OrderDef in orderings generates a column, and evaluation of the expression associated with each
- * OrderDef determines the value of each column. These records will later be sorted based on the values in each
+ * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each
+ * Ordering determines the value of each column. These records will later be sorted based on the values in each
* column, in the same order as the orderings.
*
* @param sv4
@@ -342,14 +343,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* @throws SchemaChangeException
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
- List<OrderDef> orderings) throws SchemaChangeException {
+ List<Ordering> orderings) throws SchemaChangeException {
List<ValueVector> localAllocationVectors = Lists.newArrayList();
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
int i = 0;
- for (OrderDef od : orderings) {
+ for (Ordering od : orderings) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry());
SchemaPath schemaPath = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN);
TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType())
@@ -521,7 +522,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
cg.setMappingSet(mainMapping);
int count = 0;
- for (OrderDef od : popConfig.getOrderings()) {
+ for (Ordering od : popConfig.getOrderings()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
if (collector.hasErrors())
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
@@ -538,7 +539,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
ClassGenerator.HoldingContainer out = cg.addExpr(f, false);
JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if (od.getDirection() == Order.Direction.ASC) {
+ if (od.getDirection() == Direction.Ascending) {
jc._then()._return(out.getValue());
} else {
jc._then()._return(out.getValue().minus());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 3b8154efd..509d13b41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -88,6 +88,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
return ref;
}
+ private boolean isWildcard(NamedExpression ex){
+ LogicalExpression expr = ex.getExpr();
+ LogicalExpression ref = ex.getRef();
+ if(expr instanceof SchemaPath && ref instanceof SchemaPath){
+ PathSegment e = ((SchemaPath) expr).getRootSegment();
+ PathSegment n = ((SchemaPath) ref).getRootSegment();
+ if(e.isNamed() && e.getNameSegment().getPath().equals("*") && n.isNamed() && n.getChild() != null && n.getChild().isNamed() && n.getChild().getNameSegment().getPath().equals("*")){
+ return true;
+ }
+ }
+ return false;
+ }
@Override
protected void setupNewSchema() throws SchemaChangeException{
@@ -99,33 +111,43 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- for(int i = 0; i < exprs.size(); i++){
- final NamedExpression namedExpression = exprs.get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
- final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
- if(collector.hasErrors()){
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
- }
-
- // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
- ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
- Preconditions.checkNotNull(incoming);
-
- TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+ if(exprs.size() == 1 && isWildcard(exprs.get(0))){
+ for(VectorWrapper<?> wrapper : incoming){
+ ValueVector vvIn = wrapper.getValueVector();
+ TransferPair tp = wrapper.getValueVector().getTransferPair(new FieldReference(vvIn.getField().getName()));
transfers.add(tp);
container.add(tp.getTo());
- logger.debug("Added transfer.");
- }else{
- // need to do evaluation.
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
- allocationVectors.add(vector);
- TypedFieldId fid = container.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
- cg.addExpr(write);
- logger.debug("Added eval.");
}
+ }else{
+ for(int i = 0; i < exprs.size(); i++){
+ final NamedExpression namedExpression = exprs.get(i);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry());
+ final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
+ if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){
+ ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
+ Preconditions.checkNotNull(incoming);
+
+ TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
+ transfers.add(tp);
+ container.add(tp.getTo());
+ logger.debug("Added transfer.");
+ }else{
+ // need to do evaluation.
+ ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ allocationVectors.add(vector);
+ TypedFieldId fid = container.add(vector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
+ cg.addExpr(write);
+ logger.debug("Added eval.");
+ }
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 4d0473558..5fd12c000 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -20,13 +20,11 @@ package org.apache.drill.exec.physical.impl.sort;
import java.io.IOException;
import java.util.List;
-import org.apache.drill.common.defs.OrderDef;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.logical.data.Order.Direction;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -46,6 +44,7 @@ import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.eigenbase.rel.RelFieldCollation.Direction;
import com.google.common.collect.ImmutableList;
import com.sun.codemodel.JConditional;
@@ -165,20 +164,20 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return createNewSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
}
- public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
+ public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException {
final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping);
}
- public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
+ public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
throws ClassTransformationException, IOException, SchemaChangeException{
CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry());
ClassGenerator<Sorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(OrderDef od : orderings){
+ for(Ordering od : orderings){
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
@@ -194,7 +193,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
HoldingContainer out = g.addExpr(f, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASC){
+ if(od.getDirection() == Direction.Ascending){
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
new file mode 100644
index 000000000..b4c1bf9ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.BitSet;
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.AggregateRelBase;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Aggregation implemented in Drill.
+ */
+public class DrillAggregateRel extends AggregateRelBase implements DrillRel {
+ /** Creates a DrillAggregateRel. */
+ public DrillAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet,
+ List<AggregateCall> aggCalls) throws InvalidRelException {
+ super(cluster, traits, child, groupSet, aggCalls);
+ for (AggregateCall aggCall : aggCalls) {
+ if (aggCall.isDistinct()) {
+ throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates");
+ }
+ }
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ try {
+ return new DrillAggregateRel(getCluster(), traitSet, sole(inputs), getGroupSet(), aggCalls);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+
+ GroupingAggregate.Builder builder = GroupingAggregate.builder();
+ builder.setInput(implementor.visitChild(this, 0, getChild()));
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ final List<String> fields = getRowType().getFieldNames();
+
+ for (int group : BitSets.toIter(groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ builder.addKey(fr, fr);
+ }
+
+ for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
+ FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i));
+ LogicalExpression expr = toDrill(aggCall.e, childFields, implementor);
+ builder.addExpr(ref, expr);
+ }
+
+ return builder.build();
+ }
+
+
+
+
+ private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for(Integer i : call.getArgList()){
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l));
+ LogicalExpression expr = implementor.getContext().getRegistry().createExpression(call.getAggregation().getName().toLowerCase(), ExpressionPosition.UNKNOWN, args);
+ return expr;
+ }
+
+ public static DrillAggregateRel convert(GroupingAggregate groupBy, ConversionContext value)
+ throws InvalidRelException {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
new file mode 100644
index 000000000..77f1ba6d3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.AggregateRel;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import java.util.logging.Logger;
+
+/**
+ * Rule that converts an {@link AggregateRel} to a {@link DrillAggregateRel}, implemented by a Drill "segment" operation
+ * followed by a "collapseaggregate" operation.
+ */
+public class DrillAggregateRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillAggregateRule();
+ protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+ private DrillAggregateRule() {
+ super(RelOptHelper.some(AggregateRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillAggregateRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final AggregateRel aggregate = (AggregateRel) call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = aggregate.getTraitSet().plus(DrillRel.CONVENTION);
+ final RelNode convertedInput = convert(input, traits);
+ try {
+ call.transformTo(new DrillAggregateRel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(),
+ aggregate.getAggCallList()));
+ } catch (InvalidRelException e) {
+ tracer.warning(e.toString());
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
new file mode 100644
index 000000000..cc147e315
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.FilterRelBase;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+
+/**
+ * Filter implemented in Drill.
+ */
+public class DrillFilterRel extends FilterRelBase implements DrillRel {
+ protected DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+ super(cluster, traits, child, condition);
+ assert getConvention() == CONVENTION;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillFilterRel(getCluster(), traitSet, sole(inputs), getCondition());
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(0.1);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final LogicalOperator input = implementor.visitChild(this, 0, getChild());
+ Filter f = new Filter(DrillOptiq.toDrill(implementor.getContext(), getChild(), getCondition()));
+ f.setInput(input);
+ return f;
+ }
+
+ public static DrillFilterRel convert(Filter filter, ConversionContext context) throws InvalidRelException{
+ RelNode input = context.toRel(filter.getInput());
+ return new DrillFilterRel(context.getCluster(), context.getLogicalTraits(), input, context.toRex(filter.getExpr()));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java
new file mode 100644
index 000000000..d4fb2391a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.FilterRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+
+/**
+ * Rule that converts a {@link org.eigenbase.rel.FilterRel} to a Drill "filter" operation.
+ */
+public class DrillFilterRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillFilterRule();
+
+ private DrillFilterRule() {
+ super(RelOptHelper.some(FilterRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillFilterRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final FilterRel filter = (FilterRel) call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = filter.getTraitSet().plus(DrillRel.CONVENTION);
+ final RelNode convertedInput = convert(input, traits);
+ call.transformTo(new DrillFilterRel(filter.getCluster(), traits, convertedInput, filter.getCondition()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java
new file mode 100644
index 000000000..acd218c25
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Set;
+
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.LogicalPlanBuilder;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.common.logical.PlanProperties.PlanType;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+import org.eigenbase.rel.RelNode;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Context for converting a tree of {@link DrillRel} nodes into a Drill logical plan.
+ */
+public class DrillImplementor {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillImplementor.class);
+
+ private Set<DrillTable> tables = Sets.newHashSet();
+ private LogicalPlanBuilder planBuilder = new LogicalPlanBuilder();
+ private LogicalPlan plan;
+ private final DrillParseContext context;
+
+
+ public DrillImplementor(DrillParseContext context, ResultMode mode) {
+ planBuilder.planProperties(PlanType.APACHE_DRILL_LOGICAL, 1, DrillImplementor.class.getName(), "", mode);
+ this.context = context;
+ }
+
+ public DrillParseContext getContext(){
+ return context;
+ }
+
+ public void registerSource(DrillTable table){
+ if(tables.add(table)){
+ planBuilder.addStorageEngine(table.getStorageEngineName(), table.getStorageEngineConfig());
+ }
+ }
+
+ public void go(DrillRel root) {
+ LogicalOperator rootLOP = root.implement(this);
+ rootLOP.accept(new AddOpsVisitor(), null);
+ }
+
+ public LogicalPlan getPlan(){
+ if(plan == null){
+ plan = planBuilder.build();
+ planBuilder = null;
+ }
+ return plan;
+ }
+
+ public LogicalOperator visitChild(DrillRel parent, int ordinal, RelNode child) {
+ return ((DrillRel) child).implement(this);
+ }
+
+ private class AddOpsVisitor extends AbstractLogicalVisitor<Void, Void, RuntimeException> {
+ @Override
+ public Void visitOp(LogicalOperator op, Void value) throws RuntimeException {
+ planBuilder.addLogicalOperator(op);
+ for(LogicalOperator o : op){
+ o.accept(this, null);
+ }
+ return null;
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
new file mode 100644
index 000000000..c08712ef6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRelBase;
+import org.eigenbase.rel.JoinRelType;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexUtil;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+import org.eigenbase.util.Pair;
+
+/**
+ * Join implemented in Drill.
+ */
+public class DrillJoinRel extends JoinRelBase implements DrillRel {
+ private final List<Integer> leftKeys = new ArrayList<>();
+ private final List<Integer> rightKeys = new ArrayList<>();
+
+ /** Creates a DrillJoinRel. */
+ public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType, Collections.<String> emptySet());
+
+ RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys);
+ if (!remaining.isAlwaysTrue()) {
+ throw new InvalidRelException("DrillJoinRel only supports equi-join");
+ }
+ }
+
+ @Override
+ public DrillJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType) {
+ try {
+ return new DrillJoinRel(getCluster(), traitSet, left, right, condition, joinType);
+ } catch (InvalidRelException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final List<String> fields = getRowType().getFieldNames();
+ assert isUnique(fields);
+ final int leftCount = left.getRowType().getFieldCount();
+ final List<String> leftFields = fields.subList(0, leftCount);
+ final List<String> rightFields = fields.subList(leftCount, fields.size());
+
+ final LogicalOperator leftOp = implementInput(implementor, 0, 0, left);
+ final LogicalOperator rightOp = implementInput(implementor, 1, leftCount, right);
+
+ Join.Builder builder = Join.builder();
+ builder.type(joinType);
+ builder.left(leftOp);
+ builder.right(rightOp);
+
+ for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+ builder.addCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them.
+ * @param implementor
+ * @param i
+ * @param offset
+ * @param input
+ * @return
+ */
+ private LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, RelNode input) {
+ final LogicalOperator inputOp = implementor.visitChild(this, i, input);
+ assert uniqueFieldNames(input.getRowType());
+ final List<String> fields = getRowType().getFieldNames();
+ final List<String> inputFields = input.getRowType().getFieldNames();
+ final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+ if (!outputFields.equals(inputFields)) {
+ // Ensure that input field names are the same as output field names.
+ // If there are duplicate field names on left and right, fields will get
+ // lost.
+ return rename(implementor, inputOp, inputFields, outputFields);
+ } else {
+ return inputOp;
+ }
+ }
+
+ private LogicalOperator rename(DrillImplementor implementor, LogicalOperator inputOp, List<String> inputFields, List<String> outputFields) {
+ Project.Builder builder = Project.builder();
+ builder.setInput(inputOp);
+ for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) {
+ builder.addExpr(new FieldReference(pair.right), new FieldReference(pair.left));
+ }
+ return builder.build();
+ }
+
+ public static DrillJoinRel convert(Join join, ConversionContext context) throws InvalidRelException{
+ RelNode left = context.toRel(join.getLeft());
+ RelNode right = context.toRel(join.getRight());
+
+ List<RexNode> joinConditions = new ArrayList<RexNode>();
+ // right fields appear after the LHS fields.
+ final int rightInputOffset = left.getRowType().getFieldCount();
+ for (JoinCondition condition : join.getConditions()) {
+ RelDataTypeField leftField = left.getRowType().getField(ExprHelper.getFieldName(condition.getLeft()), true);
+ RelDataTypeField rightField = right.getRowType().getField(ExprHelper.getFieldName(condition.getRight()), true);
+ joinConditions.add(
+ context.getRexBuilder().makeCall(
+ SqlStdOperatorTable.EQUALS,
+ context.getRexBuilder().makeInputRef(leftField.getType(), leftField.getIndex()),
+ context.getRexBuilder().makeInputRef(rightField.getType(), rightInputOffset + rightField.getIndex())
+ )
+ );
+ }
+ RexNode rexCondition = RexUtil.composeConjunction(context.getRexBuilder(), joinConditions, false);
+ return new DrillJoinRel(context.getCluster(), context.getLogicalTraits(), left, right, rexCondition, join.getJoinType());
+ }
+
+
+ /**
+ * Returns whether there are any elements in common between left and right.
+ */
+ private static <T> boolean intersects(List<T> left, List<T> right) {
+ return new HashSet<>(left).removeAll(right);
+ }
+
+ private boolean uniqueFieldNames(RelDataType rowType) {
+ return isUnique(rowType.getFieldNames());
+ }
+
+ private static <T> boolean isUnique(List<T> list) {
+ return new HashSet<>(list).size() == list.size();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
new file mode 100644
index 000000000..f32fa590f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.JoinRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+import org.eigenbase.trace.EigenbaseTrace;
+
+import java.util.logging.Logger;
+
+/**
+ * Rule that converts a {@link JoinRel} to a {@link DrillJoinRel}, which is implemented by Drill "join" operation.
+ */
+public class DrillJoinRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillJoinRule();
+ protected static final Logger tracer = EigenbaseTrace.getPlannerTracer();
+
+ private DrillJoinRule() {
+ super(
+ RelOptHelper.some(JoinRel.class, Convention.NONE, RelOptHelper.any(RelNode.class), RelOptHelper.any(RelNode.class)),
+ "DrillJoinRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final JoinRel join = (JoinRel) call.rel(0);
+ final RelNode left = call.rel(1);
+ final RelNode right = call.rel(2);
+ final RelTraitSet traits = join.getTraitSet().plus(DrillRel.CONVENTION);
+
+ final RelNode convertedLeft = convert(left, traits);
+ final RelNode convertedRight = convert(right, traits);
+ try {
+ call.transformTo(new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, join.getCondition(),
+ join.getJoinType()));
+ } catch (InvalidRelException e) {
+ tracer.warning(e.toString());
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
new file mode 100644
index 000000000..54be0527e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.type.SqlTypeName;
+
+public class DrillLimitRel extends SingleRel implements DrillRel {
+ private RexNode offset;
+ private RexNode fetch;
+
+ public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
+ super(cluster, traitSet, child);
+ this.offset = offset;
+ this.fetch = fetch;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator inputOp = implementor.visitChild(this, 0, getChild());
+
+ // First offset to include into results (inclusive). Null implies it is starting from offset 0
+ int first = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0;
+
+ // Last offset to stop including into results (exclusive), translating fetch row counts into an offset.
+ // Null value implies including entire remaining result set from first offset
+ Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null;
+ Limit limit = new Limit(first, last);
+ limit.setInput(inputOp);
+ return limit;
+ }
+
+ public static DrillLimitRel convert(Limit limit, ConversionContext context) throws InvalidRelException{
+ RelNode input = context.toRel(limit.getInput());
+ RexNode first = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getFirst()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ RexNode last = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getLast()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+ return new DrillLimitRel(context.getCluster(), context.getLogicalTraits(), input, first, last);
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
new file mode 100644
index 000000000..85c594e45
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and Limit Rel
+ */
+public class DrillLimitRule extends RelOptRule {
+ public static DrillLimitRule INSTANCE = new DrillLimitRule();
+
+ private DrillLimitRule() {
+ super(RelOptHelper.some(SortRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillLimitRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final SortRel sort = call.rel(0);
+ return sort.offset != null || sort.fetch != null;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final SortRel incomingSort = call.rel(0);
+ final RelTraitSet incomingTraits = incomingSort.getTraitSet();
+ RelNode input = incomingSort.getChild();
+
+ // if the Optiq sort rel includes a collation and a limit, we need to create a copy the sort rel that excludes the
+ // limit information.
+ if (!incomingSort.getCollation().getFieldCollations().isEmpty()) {
+ input = incomingSort.copy(incomingTraits, input, incomingSort.getCollation(), null, null);
+ }
+
+ RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.CONVENTION));
+ call.transformTo(new DrillLimitRel(incomingSort.getCluster(), incomingTraits.plus(DrillRel.CONVENTION), convertedInput, incomingSort.offset, incomingSort.fetch));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
new file mode 100644
index 000000000..2e29bd425
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.math.BigDecimal;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.exec.record.NullExpression;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexCorrelVariable;
+import org.eigenbase.rex.RexDynamicParam;
+import org.eigenbase.rex.RexFieldAccess;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexLiteral;
+import org.eigenbase.rex.RexLocalRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.eigenbase.rex.RexRangeRef;
+import org.eigenbase.rex.RexVisitorImpl;
+import org.eigenbase.sql.SqlSyntax;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Utilities for Drill's planner.
+ */
+public class DrillOptiq {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOptiq.class);
+
+ /**
+ * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax.
+ */
+ static LogicalExpression toDrill(DrillParseContext context, RelNode input, RexNode expr) {
+ final RexToDrill visitor = new RexToDrill(context, input);
+ return expr.accept(visitor);
+ }
+
+ private static class RexToDrill extends RexVisitorImpl<LogicalExpression> {
+ private final RelNode input;
+ private final DrillParseContext context;
+
+ RexToDrill(DrillParseContext context, RelNode input) {
+ super(true);
+ this.context = context;
+ this.input = input;
+ }
+
+ @Override
+ public LogicalExpression visitInputRef(RexInputRef inputRef) {
+ final int index = inputRef.getIndex();
+ final RelDataTypeField field = input.getRowType().getFieldList().get(index);
+ return new FieldReference(field.getName());
+ }
+
+ @Override
+ public LogicalExpression visitCall(RexCall call) {
+ logger.debug("RexCall {}, {}", call);
+ final SqlSyntax syntax = call.getOperator().getSyntax();
+ switch (syntax) {
+ case BINARY:
+ logger.debug("Binary");
+ LogicalExpression op1 = call.getOperands().get(0).accept(this);
+ LogicalExpression op2 = call.getOperands().get(1).accept(this);
+ return context.getRegistry().createExpression(call.getOperator().getName(), Lists.newArrayList(op1, op2));
+ case FUNCTION:
+ logger.debug("Function");
+ List<LogicalExpression> exprs = Lists.newArrayList();
+ for(RexNode n : call.getOperands()){
+ exprs.add(n.accept(this));
+ }
+ return context.getRegistry().createExpression(call.getOperator().getName().toLowerCase(), Lists.newArrayList(exprs));
+ case SPECIAL:
+ logger.debug("Special");
+ switch(call.getKind()){
+
+ case CAST:
+ return getDrillCastFunctionFromOptiq(call);
+ }
+
+ if (call.getOperator() == SqlStdOperatorTable.ITEM) {
+ SchemaPath left = (SchemaPath) call.getOperands().get(0).accept(this);
+ final RexLiteral literal = (RexLiteral) call.getOperands().get(1);
+ return left.getChild((String) literal.getValue2());
+ }
+
+ // fall through
+ default:
+ throw new AssertionError("todo: implement syntax " + syntax + "(" + call + ")");
+ }
+ }
+
+ private LogicalExpression doUnknown(Object o){
+ logger.warn("Doesn't currently support consumption of {}.", o);
+ return NullExpression.INSTANCE;
+ }
+ @Override
+ public LogicalExpression visitLocalRef(RexLocalRef localRef) {
+ return doUnknown(localRef);
+ }
+
+ @Override
+ public LogicalExpression visitOver(RexOver over) {
+ return doUnknown(over);
+ }
+
+ @Override
+ public LogicalExpression visitCorrelVariable(RexCorrelVariable correlVariable) {
+ return doUnknown(correlVariable);
+ }
+
+ @Override
+ public LogicalExpression visitDynamicParam(RexDynamicParam dynamicParam) {
+ return doUnknown(dynamicParam);
+ }
+
+ @Override
+ public LogicalExpression visitRangeRef(RexRangeRef rangeRef) {
+ return doUnknown(rangeRef);
+ }
+
+ @Override
+ public LogicalExpression visitFieldAccess(RexFieldAccess fieldAccess) {
+ return super.visitFieldAccess(fieldAccess);
+ }
+
+
+ private LogicalExpression getDrillCastFunctionFromOptiq(RexCall call){
+ LogicalExpression arg = call.getOperands().get(0).accept(this);
+ List<LogicalExpression> args = Collections.singletonList(arg);
+ String fname = null;
+ switch(call.getType().getSqlTypeName().getName()){
+ case "VARCHAR": {
+ args = Lists.newArrayList(arg, new LongExpression(call.getType().getPrecision()));
+ return context.getRegistry().createExpression("castVARCHAR", args);
+ }
+ case "INTEGER": fname = "castINT"; break;
+ case "FLOAT": fname = "castFLOAT4"; break;
+ case "DOUBLE": fname = "castFLOAT8"; break;
+ case "DECIMAL": throw new UnsupportedOperationException("Need to add decimal.");
+ default: fname = "cast" + call.getType().getSqlTypeName().getName();
+ }
+ return context.getRegistry().createExpression(fname, args);
+
+ }
+
+
+
+ @Override
+ public LogicalExpression visitLiteral(RexLiteral literal) {
+ switch(literal.getTypeName()){
+ case BIGINT:
+ long l = ((BigDecimal) literal.getValue()).longValue();
+ return ValueExpressions.getBigInt(l);
+ case BOOLEAN:
+ return ValueExpressions.getBit(((Boolean) literal.getValue()));
+ case CHAR:
+ return ValueExpressions.getChar(((String) literal.getValue()));
+ case DOUBLE:
+ double d = ((BigDecimal) literal.getValue()).doubleValue();
+ return ValueExpressions.getFloat8(d);
+ case FLOAT:
+ float f = ((BigDecimal) literal.getValue()).floatValue();
+ return ValueExpressions.getFloat4(f);
+ case INTEGER:
+ case DECIMAL:
+ int i = ((BigDecimal) literal.getValue()).intValue();
+ return ValueExpressions.getInt(i);
+ case VARCHAR:
+ return ValueExpressions.getChar(((String) literal.getValue()));
+ default:
+ throw new UnsupportedOperationException(String.format("Unable to convert the value of %s and type %s to a Drill constant expression.", literal, literal.getTypeName()));
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java
new file mode 100644
index 000000000..b82fef56a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.common.expression.FunctionRegistry;
+
+public class DrillParseContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParseContext.class);
+
+ private final FunctionRegistry registry;
+
+ public DrillParseContext(FunctionRegistry registry) {
+ super();
+ this.registry = registry;
+ }
+
+ public FunctionRegistry getRegistry(){
+ return registry;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
new file mode 100644
index 000000000..ee3a0f45a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.reltype.RelDataTypeFieldImpl;
+import org.eigenbase.reltype.RelRecordType;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.sql.type.SqlTypeName;
+import org.eigenbase.util.Pair;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+/**
+ * Project implemented in Drill.
+ */
+public class DrillProjectRel extends ProjectRelBase implements DrillRel {
+ protected DrillProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
+ RelDataType rowType) {
+ super(cluster, traits, child, exps, rowType, Flags.BOXED);
+ assert getConvention() == CONVENTION;
+ }
+
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillProjectRel(getCluster(), traitSet, sole(inputs), new ArrayList<RexNode>(exps), rowType);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(0.1);
+ }
+
+ private List<Pair<RexNode, String>> projects() {
+ return Pair.zip(exps, getRowType().getFieldNames());
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator inputOp = implementor.visitChild(this, 0, getChild());
+ Project.Builder builder = Project.builder();
+ builder.setInput(inputOp);
+ for (Pair<RexNode, String> pair : projects()) {
+ LogicalExpression expr = DrillOptiq.toDrill(implementor.getContext(), getChild(), pair.left);
+ builder.addExpr(new FieldReference("output." + pair.right), expr);
+ }
+ return builder.build();
+ }
+
+ public static DrillProjectRel convert(Project project, ConversionContext context) throws InvalidRelException{
+ RelNode input = context.toRel(project.getInput());
+ List<RelDataTypeField> fields = Lists.newArrayList();
+ List<RexNode> exps = Lists.newArrayList();
+ for(NamedExpression expr : project.getSelections()){
+ fields.add(new RelDataTypeFieldImpl(expr.getRef().getPath().toString(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) ));
+ exps.add(context.toRex(expr.getExpr()));
+ }
+ return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java
new file mode 100644
index 000000000..bf5bcffc7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * Rule that converts a {@link org.eigenbase.rel.ProjectRel} to a Drill "project" operation.
+ */
+public class DrillProjectRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillProjectRule();
+
+ private DrillProjectRule() {
+ super(RelOptHelper.some(ProjectRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillProjectRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ProjectRel project = (ProjectRel) call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = project.getTraitSet().plus(DrillRel.CONVENTION);
+ final RelNode convertedInput = convert(input, traits);
+ call.transformTo(new DrillProjectRel(project.getCluster(), traits, convertedInput, project.getProjects(), project
+ .getRowType()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
new file mode 100644
index 000000000..63a7207c1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.Convention;
+
+/**
+ * Relational expression that is implemented in Drill.
+ */
+public interface DrillRel extends RelNode {
+ /** Calling convention for relational expressions that are "implemented" by
+ * generating Drill logical plans. */
+ Convention CONVENTION = new Convention.Impl("DRILL", DrillRel.class);
+
+ LogicalOperator implement(DrillImplementor implementor);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
new file mode 100644
index 000000000..53da67f6c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Iterator;
+
+import net.hydromatic.optiq.tools.RuleSet;
+
+import org.eigenbase.rel.rules.MergeProjectRule;
+import org.eigenbase.rel.rules.PushFilterPastJoinRule;
+import org.eigenbase.rel.rules.PushFilterPastProjectRule;
+import org.eigenbase.rel.rules.PushJoinThroughJoinRule;
+import org.eigenbase.rel.rules.PushSortPastProjectRule;
+import org.eigenbase.rel.rules.ReduceAggregatesRule;
+import org.eigenbase.rel.rules.RemoveDistinctAggregateRule;
+import org.eigenbase.rel.rules.RemoveDistinctRule;
+import org.eigenbase.rel.rules.RemoveSortRule;
+import org.eigenbase.rel.rules.RemoveTrivialCalcRule;
+import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
+import org.eigenbase.rel.rules.SwapJoinRule;
+import org.eigenbase.rel.rules.TableAccessRule;
+import org.eigenbase.rel.rules.UnionToDistinctRule;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.volcano.AbstractConverter.ExpandConversionRule;
+
+import com.google.common.collect.ImmutableSet;
+
+public class DrillRuleSets {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class);
+
+ public static final RuleSet DRILL_BASIC_RULES = new DrillRuleSet(ImmutableSet.of( //
+
+// ExpandConversionRule.instance,
+// SwapJoinRule.instance,
+// RemoveDistinctRule.instance,
+// UnionToDistinctRule.instance,
+// RemoveTrivialProjectRule.instance,
+// RemoveTrivialCalcRule.instance,
+// RemoveSortRule.INSTANCE,
+//
+// TableAccessRule.instance, //
+// MergeProjectRule.instance, //
+// PushFilterPastProjectRule.instance, //
+// PushFilterPastJoinRule.FILTER_ON_JOIN, //
+// RemoveDistinctAggregateRule.instance, //
+// ReduceAggregatesRule.instance, //
+// SwapJoinRule.instance, //
+// PushJoinThroughJoinRule.RIGHT, //
+// PushJoinThroughJoinRule.LEFT, //
+// PushSortPastProjectRule.INSTANCE, //
+
+ DrillScanRule.INSTANCE,
+ DrillFilterRule.INSTANCE,
+ DrillProjectRule.INSTANCE,
+ DrillAggregateRule.INSTANCE,
+
+ DrillLimitRule.INSTANCE,
+ DrillSortRule.INSTANCE,
+ DrillJoinRule.INSTANCE,
+ DrillUnionRule.INSTANCE
+ ));
+
+
+ private static class DrillRuleSet implements RuleSet{
+ final ImmutableSet<RelOptRule> rules;
+
+ public DrillRuleSet(ImmutableSet<RelOptRule> rules) {
+ super();
+ this.rules = rules;
+ }
+
+ @Override
+ public Iterator<RelOptRule> iterator() {
+ return rules.iterator();
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
new file mode 100644
index 000000000..afc2d1bd4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.TableAccessRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * GroupScan of a Drill table.
+ */
+public class DrillScanRel extends TableAccessRelBase implements DrillRel {
+ private final DrillTable drillTable;
+
+ /** Creates a DrillScan. */
+ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table) {
+ super(cluster, traits, table);
+ assert getConvention() == CONVENTION;
+ this.drillTable = table.unwrap(DrillTable.class);
+ assert drillTable != null;
+ }
+
+// @Override
+// public void register(RelOptPlanner planner) {
+// super.register(planner);
+// DrillOptiq.registerStandardPlannerRules(planner);
+// }
+
+ public LogicalOperator implement(DrillImplementor implementor) {
+ Scan.Builder builder = Scan.builder();
+ builder.storageEngine(drillTable.getStorageEngineName());
+ builder.selection(new JSONOptions(drillTable.getSelection()));
+ //builder.outputReference(new FieldReference("_MAP"));
+ implementor.registerSource(drillTable);
+ return builder.build();
+ }
+
+ public static DrillScanRel convert(Scan scan, ConversionContext context){
+ return new DrillScanRel(context.getCluster(), context.getLogicalTraits(), context.getTable(scan));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java
new file mode 100644
index 000000000..58e648a73
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel;
+
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class DrillScanRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillScanRule();
+
+ private DrillScanRule() {
+ super(RelOptHelper.any(EnumerableTableAccessRel.class), "DrillTableRule");
+ }
+
+
+
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final EnumerableTableAccessRel access = (EnumerableTableAccessRel) call.rel(0);
+ final RelTraitSet traits = access.getTraitSet().plus(DrillRel.CONVENTION);
+ call.transformTo(new DrillScanRel(access.getCluster(), traits, access.getTable()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java
new file mode 100644
index 000000000..829947adb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import net.hydromatic.optiq.impl.java.JavaTypeFactory;
+import net.hydromatic.optiq.rules.java.EnumerableConvention;
+import net.hydromatic.optiq.rules.java.EnumerableRel;
+import net.hydromatic.optiq.rules.java.EnumerableRelImplementor;
+import net.hydromatic.optiq.rules.java.JavaRowFormat;
+import net.hydromatic.optiq.rules.java.PhysType;
+import net.hydromatic.optiq.rules.java.PhysTypeImpl;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Store;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SingleRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Relational expression that converts from Drill to Enumerable. At runtime it executes a Drill query and returns the
+ * results as an {@link net.hydromatic.linq4j.Enumerable}.
+ */
+public class DrillScreenRel extends SingleRel implements DrillRel {
+ private static final Logger logger = LoggerFactory.getLogger(DrillScreenRel.class);
+
+ private PhysType physType;
+
+ public DrillScreenRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) {
+ super(cluster, traitSet, input);
+ assert input.getConvention() == DrillRel.CONVENTION;
+ physType = PhysTypeImpl.of((JavaTypeFactory) cluster.getTypeFactory(), input.getRowType(), JavaRowFormat.ARRAY);
+ }
+
+ public PhysType getPhysType() {
+ return physType;
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(.1);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillScreenRel(getCluster(), traitSet, sole(inputs));
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ LogicalOperator childOp = implementor.visitChild(this, 0, getChild());
+ return Store.builder().setInput(childOp).storageEngine("--SCREEN--").build();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
new file mode 100644
index 000000000..04af9d5b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelCollationImpl;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexNode;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Sort implemented in Drill.
+ */
+public class DrillSortRel extends SortRel implements DrillRel {
+
+ /** Creates a DrillSortRel. */
+ public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) {
+ super(cluster, traits, input, collation);
+ }
+
+ /** Creates a DrillSortRel with offset and fetch. */
+ public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ super(cluster, traits, input, collation, offset, fetch);
+ }
+
+ @Override
+ public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) {
+ return new DrillSortRel(getCluster(), traitSet, input, collation, offset, fetch);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final Order.Builder builder = Order.builder();
+ builder.setInput(implementor.visitChild(this, 0, getChild()));
+
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ for(RelFieldCollation fieldCollation : this.collation.getFieldCollations()){
+ builder.addOrdering(fieldCollation.getDirection(),
+ new FieldReference(childFields.get(fieldCollation.getFieldIndex())),
+ fieldCollation.nullDirection);
+ }
+ return builder.build();
+ }
+
+
+ public static RelNode convert(Order order, ConversionContext context) throws InvalidRelException{
+
+ // if there are compound expressions in the order by, we need to convert into projects on either side.
+ RelNode input = context.toRel(order.getInput());
+ List<String> fields = input.getRowType().getFieldNames();
+
+ // build a map of field names to indices.
+ Map<String, Integer> fieldMap = Maps.newHashMap();
+ int i =0;
+ for(String field : fields){
+ fieldMap.put(field, i);
+ i++;
+ }
+
+ List<RelFieldCollation> collations = Lists.newArrayList();
+
+ for(Ordering o : order.getOrderings()){
+ String fieldName = ExprHelper.getFieldName(o.getExpr());
+ int fieldId = fieldMap.get(fieldName);
+ RelFieldCollation c = new RelFieldCollation(fieldId, o.getDirection(), o.getNullDirection());
+ }
+ return new DrillSortRel(context.getCluster(), context.getLogicalTraits(), input, RelCollationImpl.of(collations));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java
new file mode 100644
index 000000000..c968e8545
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.SortRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+
+/**
+ * Rule that converts an {@link SortRel} to a {@link DrillSortRel}, implemented by a Drill "order" operation.
+ */
+public class DrillSortRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillSortRule();
+
+ private DrillSortRule() {
+ super(RelOptHelper.some(SortRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillSortRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final SortRel sort = call.rel(0);
+ return sort.offset == null && sort.fetch == null;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+
+ final SortRel sort = call.rel(0);
+
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION);
+
+ final RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.CONVENTION));
+ call.transformTo(new DrillSortRel(sort.getCluster(), traits, convertedInput, sort.getCollation()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java
new file mode 100644
index 000000000..30c78109e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import net.hydromatic.optiq.prepare.Prepare.CatalogReader;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.TableModificationRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelTraitSet;
+
+public class DrillStoreRel extends TableModificationRelBase implements DrillRel{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillStoreRel.class);
+
+ protected DrillStoreRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, CatalogReader catalogReader,
+ RelNode child, Operation operation, List<String> updateColumnList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened);
+
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ return null;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
new file mode 100644
index 000000000..30dd48de2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collections;
+
+import net.hydromatic.optiq.Schema.TableType;
+import net.hydromatic.optiq.Statistic;
+import net.hydromatic.optiq.Statistics;
+import net.hydromatic.optiq.Table;
+
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.sql.type.SqlTypeName;
+
+/** Optiq Table used by Drill. */
+public class DrillTable implements Table{
+
+ private final String name;
+ private final String storageEngineName;
+ public final StorageEngineConfig storageEngineConfig;
+ private Object selection;
+
+
+ /** Creates a DrillTable. */
+ public DrillTable(String name, String storageEngineName, Object selection, StorageEngineConfig storageEngineConfig) {
+ this.name = name;
+ this.selection = selection;
+ this.storageEngineConfig = storageEngineConfig;
+ this.storageEngineName = storageEngineName;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public StorageEngineConfig getStorageEngineConfig(){
+ return storageEngineConfig;
+ }
+
+ public Object getSelection() {
+ return selection;
+ }
+
+ public String getStorageEngineName() {
+ return storageEngineName;
+ }
+
+ @Override
+ public Statistic getStatistic() {
+ return Statistics.UNKNOWN;
+ }
+
+ public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) {
+ return new DrillScanRel(context.getCluster(),
+ context.getCluster().traitSetOf(DrillRel.CONVENTION),
+ table);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+ return new RelDataTypeDrillImpl(typeFactory);
+ }
+
+ @Override
+ public TableType getJdbcTableType() {
+ return null;
+ }
+
+
+
+// /** Factory for custom tables in Optiq schema. */
+// @SuppressWarnings("UnusedDeclaration")
+// public static class Factory implements TableFactory<DrillTable> {
+//
+// @Override
+// public DrillTable create(Schema schema, String name, Map<String, Object> operand, RelDataType rowType) {
+//
+// final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig();
+// final ClasspathInputConfig inputConfig = new ClasspathInputConfig();
+// inputConfig.path = "/" + name.toLowerCase() + ".json";
+// inputConfig.type = DataWriter.ConverterType.JSON;
+// return createTable(schema.getTypeFactory(), (MutableSchema) schema, name, "donuts-json", rseConfig, inputConfig);
+// }
+// }
+
+
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
new file mode 100644
index 000000000..1be9cafea
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import net.hydromatic.linq4j.Ord;
+
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.UnionRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+
+/**
+ * Union implemented in Drill.
+ */
+public class DrillUnionRel extends UnionRelBase implements DrillRel {
+ /** Creates a DrillUnionRel. */
+ public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits,
+ List<RelNode> inputs, boolean all) {
+ super(cluster, traits, inputs, all);
+ }
+
+ @Override
+ public DrillUnionRel copy(RelTraitSet traitSet, List<RelNode> inputs,
+ boolean all) {
+ return new DrillUnionRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ // divide cost by two to ensure cheaper than EnumerableDrillRel
+ return super.computeSelfCost(planner).multiplyBy(.5);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ Union.Builder builder = Union.builder();
+ for (Ord<RelNode> input : Ord.zip(inputs)) {
+ builder.addInput(implementor.visitChild(this, input.i, input.e));
+ }
+ builder.setDistinct(!all);
+ return builder.build();
+ }
+
+ public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
new file mode 100644
index 000000000..bc1e6f471
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.UnionRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Rule that converts a {@link UnionRel} to a {@link DrillUnionRel}, implemented by a "union" operation.
+ */
+public class DrillUnionRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillUnionRule();
+
+ private DrillUnionRule() {
+ super(RelOptHelper.any(UnionRel.class, Convention.NONE), "DrillUnionRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final UnionRel union = (UnionRel) call.rel(0);
+ final RelTraitSet traits = union.getTraitSet().plus(DrillRel.CONVENTION);
+ final List<RelNode> convertedInputs = new ArrayList<>();
+ for (RelNode input : union.getInputs()) {
+ final RelNode convertedInput = convert(input, traits);
+ convertedInputs.add(convertedInput);
+ }
+ call.transformTo(new DrillUnionRel(union.getCluster(), traits, convertedInputs, union.all));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
new file mode 100644
index 000000000..e77018184
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.ValuesRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+/**
+ * Values implemented in Drill.
+ */
+public class DrillValuesRel extends ValuesRelBase implements DrillRel {
+ protected DrillValuesRel(RelOptCluster cluster, RelDataType rowType, List<List<RexLiteral>> tuples, RelTraitSet traits) {
+ super(cluster, rowType, tuples, traits);
+ assert getConvention() == CONVENTION;
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ assert inputs.isEmpty();
+ return new DrillValuesRel(getCluster(), rowType, tuples, traitSet);
+ }
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ return super.computeSelfCost(planner).multiplyBy(0.1);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ // Update when https://issues.apache.org/jira/browse/DRILL-57 fixed
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java
new file mode 100644
index 000000000..fae18caff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.ValuesRel;
+import org.eigenbase.relopt.*;
+
+/**
+ * Rule that converts a {@link ValuesRel} to a Drill "values" operation.
+ */
+public class DrillValuesRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillValuesRule();
+
+ private DrillValuesRule() {
+ super(RelOptHelper.any(ValuesRel.class, Convention.NONE), "DrillValuesRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ValuesRel values = (ValuesRel) call.rel(0);
+ final RelTraitSet traits = values.getTraitSet().plus(DrillRel.CONVENTION);
+ call.transformTo(new DrillValuesRel(values.getCluster(), values.getRowType(), values.getTuples(), traits));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java
new file mode 100644
index 000000000..2fcf6600c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import net.hydromatic.optiq.rules.java.EnumerableConvention;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.convert.ConverterRule;
+
+/**
+ * Rule that converts any Drill relational expression to enumerable format by adding a {@link DrillScreenRel}.
+ */
+public class EnumerableDrillRule extends ConverterRule {
+
+ public static EnumerableDrillRule INSTANCE = new EnumerableDrillRule(EnumerableConvention.INSTANCE);
+
+
+ private EnumerableDrillRule(EnumerableConvention outConvention) {
+ super(RelNode.class, DrillRel.CONVENTION, outConvention, "EnumerableDrillRule." + outConvention);
+ }
+
+ @Override
+ public boolean isGuaranteed() {
+ return true;
+ }
+
+ @Override
+ public RelNode convert(RelNode rel) {
+ assert rel.getTraitSet().contains(DrillRel.CONVENTION);
+ return new DrillScreenRel(rel.getCluster(), rel.getTraitSet().replace(getOutConvention()), rel);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
new file mode 100644
index 000000000..9456a8128
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+
+public class ExprHelper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExprHelper.class);
+
+ private final static String COMPOUND_FAIL_MESSAGE = "The current Optiq based logical plan interpreter does not complicated expressions. For Order By and Filter";
+
+ public static String getAggregateFieldName(FunctionCall c){
+ List<LogicalExpression> exprs = c.args;
+ if(exprs.size() != 1) throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
+ return getFieldName(exprs.iterator().next());
+ }
+
+ public static String getFieldName(LogicalExpression e){
+ if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString();
+ throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java
new file mode 100644
index 000000000..835e4a558
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+
+
+import org.eigenbase.reltype.*;
+import org.eigenbase.sql.SqlCollation;
+import org.eigenbase.sql.SqlIdentifier;
+import org.eigenbase.sql.SqlIntervalQualifier;
+import org.eigenbase.sql.type.SqlTypeName;
+
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+/* We use an instance of this class as the row type for
+ * Drill table. Since we don't know the schema before hand
+ * whenever optiq requires us to validate that a field exists
+ * we always return true and indicate that the type of that
+ * field is 'ANY'
+ */
+public class RelDataTypeDrillImpl extends RelDataTypeImpl {
+
+ private RelDataTypeField defaultField;
+ RelDataTypeFactory typeFactory;
+ List<RelDataTypeField> drillFieldList = new LinkedList<>();
+ List<String> drillfieldNames = new LinkedList<>();
+
+
+ public RelDataTypeDrillImpl(RelDataTypeFactory typeFactory) {
+ this.typeFactory = typeFactory;
+ computeDigest();
+ }
+
+ @Override
+ public List<RelDataTypeField> getFieldList() {
+
+ if (drillFieldList.size() == 0)
+ {
+ /* By default we only have a single row in drill of 'ANY' type
+ * (mainly for select * type queries)
+ */
+ defaultField = new RelDataTypeFieldImpl("*", 0, typeFactory.createSqlType(SqlTypeName.ANY));
+
+ drillFieldList.add(defaultField);
+ drillfieldNames.add("*");
+ }
+ return drillFieldList;
+ }
+
+
+ @Override
+ public int getFieldCount() {
+ return drillFieldList.size();
+ }
+
+// @Override
+// public int getFieldOrdinal(String fieldName, boolean caseSensitive) {
+//
+// /* Get the list of fields and return the
+// * index if the field exists
+// */
+// for (RelDataTypeField field : drillFieldList) {
+// if (field.getName().equals(fieldName))
+// return field.getIndex();
+// }
+//
+// /* Couldn't find the field in our list, return -1
+// * Unsure if I should add it to our list of fields
+// */
+// return -1;
+// }
+
+ @Override
+ /**
+ *
+ */
+ public RelDataTypeField getField(String fieldName, boolean caseSensitive) {
+
+ /* First check if this field name exists in our field list */
+ for (RelDataTypeField field : drillFieldList)
+ {
+ if (field.getName().equals(fieldName))
+ return field;
+ }
+
+ /* This field does not exist in our field list add it */
+ RelDataTypeField newField = new RelDataTypeFieldImpl(fieldName, drillFieldList.size(), typeFactory.createSqlType(SqlTypeName.ANY));
+
+ /* Add it to the list of fields */
+ drillFieldList.add(newField);
+
+ /* Add the name to our list of field names */
+ drillfieldNames.add(fieldName);
+
+ return newField;
+ }
+
+
+ @Override
+ public List<String> getFieldNames() {
+
+ if (drillfieldNames.size() == 0) {
+ drillfieldNames.add("*");
+ }
+
+ return drillfieldNames;
+ }
+
+ @Override
+ public SqlTypeName getSqlTypeName() {
+ return null;
+ }
+
+ @Override
+ protected void generateTypeString(StringBuilder sb, boolean withDetail) {
+ sb.append("DrillRecordRow");
+ }
+
+ @Override
+ public boolean isStruct() {
+ return true;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
new file mode 100644
index 000000000..92272f879
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleOperand;
+import org.eigenbase.relopt.RelTrait;
+
+public class RelOptHelper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RelOptHelper.class);
+
+ public static RelOptRuleOperand any(Class<? extends RelNode> first, RelTrait trait){
+ return RelOptRule.operand(first, trait, RelOptRule.any());
+ }
+
+ public static RelOptRuleOperand any(Class<? extends RelNode> first){
+ return RelOptRule.operand(first, RelOptRule.any());
+ }
+
+ public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){
+ return RelOptRule.operand(rel, RelOptRule.some(first, rest));
+ }
+
+ public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelTrait trait, RelOptRuleOperand first, RelOptRuleOperand... rest){
+ return RelOptRule.operand(rel, trait, RelOptRule.some(first, rest));
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
new file mode 100644
index 000000000..980977085
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Constant;
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.SinkOperator;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This visitor will walk a logical plan and record in a map the list of field references associated to each scan. These
+ * can then be used to update scan object to appear to be explicitly fielded for optimization purposes.
+ */
+public class ScanFieldDeterminer extends AbstractLogicalVisitor<Void, ScanFieldDeterminer.FieldList, RuntimeException> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanFieldDeterminer.class);
+
+ private FieldReferenceFinder finder = new FieldReferenceFinder();
+ private Map<Scan, FieldList> scanFields = Maps.newHashMap();
+
+
+ public static Map<Scan, FieldList> getFieldLists(LogicalPlan plan){
+ Collection<SinkOperator> ops = plan.getGraph().getRoots();
+ Preconditions.checkArgument(ops.size() == 1, "Scan Field determiner currently only works with plans that have a single root.");
+ ScanFieldDeterminer sfd = new ScanFieldDeterminer();
+ ops.iterator().next().accept(sfd, new FieldList());
+ return sfd.scanFields;
+ }
+
+ private ScanFieldDeterminer(){
+ }
+
+ public static class FieldList {
+ private Set<SchemaPath> projected = Sets.newHashSet();
+ private Set<SchemaPath> referenced = Sets.newHashSet();
+
+ public void addProjected(SchemaPath path) {
+ projected.add(path);
+ }
+
+ public void addReferenced(SchemaPath path) {
+ referenced.add(path);
+ }
+
+ public void addReferenced(Collection<SchemaPath> paths) {
+ referenced.addAll(paths);
+ }
+
+ public void addProjected(Collection<SchemaPath> paths) {
+ projected.addAll(paths);
+ }
+
+ public FieldList clone() {
+ FieldList newList = new FieldList();
+ for (SchemaPath p : projected) {
+ newList.addProjected(p);
+ }
+ for (SchemaPath p : referenced) {
+ newList.addReferenced(p);
+ }
+ return newList;
+ }
+ }
+
+ @Override
+ public Void visitScan(Scan scan, FieldList value) {
+ if (value == null) {
+ scanFields.put(scan, new FieldList());
+ } else {
+ scanFields.put(scan, value);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitStore(Store store, FieldList value) {
+ store.getInput().accept(this, value);
+ return null;
+ }
+
+ @Override
+ public Void visitGroupingAggregate(GroupingAggregate groupBy, FieldList value) {
+ FieldList list = new FieldList();
+ for (NamedExpression e : groupBy.getExprs()) {
+ list.addProjected(e.getExpr().accept(finder, null));
+ }
+ for (NamedExpression e : groupBy.getKeys()) {
+ list.addProjected(e.getExpr().accept(finder, null));
+ }
+ groupBy.getInput().accept(this, list);
+ return null;
+ }
+
+ @Override
+ public Void visitFilter(Filter filter, FieldList value) {
+ value.addReferenced(filter.getExpr().accept(finder, null));
+ return null;
+ }
+
+ @Override
+ public Void visitProject(Project project, FieldList value) {
+ FieldList fl = new FieldList();
+ for (NamedExpression e : project.getSelections()) {
+ fl.addProjected(e.getExpr().accept(finder, null));
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitConstant(Constant constant, FieldList value) {
+ return null;
+ }
+
+ @Override
+ public Void visitOrder(Order order, FieldList fl) {
+ for (Ordering o : order.getOrderings()) {
+ fl.addReferenced(o.getExpr().accept(finder, null));
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitJoin(Join join, FieldList fl) {
+ {
+ FieldList leftList = fl.clone();
+ for (JoinCondition c : join.getConditions()) {
+ leftList.addReferenced(c.getLeft().accept(finder, null));
+ }
+ join.getLeft().accept(this, leftList);
+ }
+
+ {
+ FieldList rightList = fl.clone();
+ for (JoinCondition c : join.getConditions()) {
+ rightList.addReferenced(c.getRight().accept(finder, null));
+ }
+ join.getLeft().accept(this, rightList);
+ }
+ return null;
+ }
+
+ @Override
+ public Void visitLimit(Limit limit, FieldList value) {
+ limit.getInput().accept(this, value);
+ return null;
+ }
+
+ @Override
+ public Void visitUnion(Union union, FieldList value) {
+ for (LogicalOperator o : union.getInputs()) {
+ o.accept(this, value.clone());
+ }
+ return null;
+ }
+
+
+ /**
+ * Search through a LogicalExpression, finding all internal schema path references and returning them in a set.
+ */
+ private class FieldReferenceFinder extends AbstractExprVisitor<Set<SchemaPath>, Void, RuntimeException> {
+
+ @Override
+ public Set<SchemaPath> visitSchemaPath(SchemaPath path, Void value) {
+ Set<SchemaPath> set = Sets.newHashSet();
+ set.add(path);
+ return set;
+ }
+
+ @Override
+ public Set<SchemaPath> visitUnknown(LogicalExpression e, Void value) {
+ Set<SchemaPath> paths = Sets.newHashSet();
+ for (LogicalExpression ex : e) {
+ paths.addAll(ex.accept(this, null));
+ }
+ return paths;
+ }
+
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
new file mode 100644
index 000000000..a020ab392
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+public class StorageEngines implements Iterable<Map.Entry<String, StorageEngineConfig>>{
+
+ private Map<String, StorageEngineConfig> storage;
+
+ @JsonCreator
+ public StorageEngines(@JsonProperty("storage") Map<String, StorageEngineConfig> storage){
+ this.storage = storage;
+ }
+
+ public static void main(String[] args) throws Exception{
+ DrillConfig config = DrillConfig.create();
+ String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StorageEngines se = config.getMapper().readValue(data, StorageEngines.class);
+ System.out.println(se);
+ }
+
+ @Override
+ public String toString() {
+ final int maxLen = 10;
+ return "StorageEngines [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]";
+ }
+
+ @Override
+ public Iterator<Entry<String, StorageEngineConfig>> iterator() {
+ return storage.entrySet().iterator();
+ }
+
+ private String toString(Collection<?> collection, int maxLen) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("[");
+ int i = 0;
+ for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
+ if (i > 0)
+ builder.append(", ");
+ builder.append(iterator.next());
+ }
+ builder.append("]");
+ return builder.toString();
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java
new file mode 100644
index 000000000..23d03b81c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import net.hydromatic.linq4j.function.Function1;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.store.SchemaProvider;
+import org.apache.drill.exec.store.SchemaProviderRegistry;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class DrillSchemaFactory implements Function1<SchemaPlus, Schema>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSchemaFactory.class);
+
+ private final SchemaProviderRegistry registry;
+ private final Map<String, StorageEngineEntry> preEntries = Maps.newHashMap();
+
+ public static DrillSchemaFactory createEmpty(){
+ return new DrillSchemaFactory();
+ }
+
+ private DrillSchemaFactory(){
+ this.registry = null;
+ }
+
+ public DrillSchemaFactory(StorageEngines engines, DrillConfig config) throws SetupException {
+ super();
+ this.registry = new SchemaProviderRegistry(config);
+
+ for (Map.Entry<String, StorageEngineConfig> entry : engines) {
+ SchemaProvider provider = registry.getSchemaProvider(entry.getValue());
+ preEntries.put(entry.getKey(), new StorageEngineEntry(entry.getValue(), provider));
+ }
+
+ }
+
+ public Schema apply(SchemaPlus root) {
+ List<String> schemaNames = Lists.newArrayList();
+ Schema defaultSchema = null;
+ for(Entry<String, StorageEngineEntry> e : preEntries.entrySet()){
+ FileSystemSchema schema = new FileSystemSchema(e.getValue().getConfig(), e.getValue().getProvider(), root, e.getKey());
+ if(defaultSchema == null) defaultSchema = schema;
+ root.add(schema);
+ schemaNames.add(e.getKey());
+ }
+ logger.debug("Registered schemas for {}", schemaNames);
+ return defaultSchema;
+ }
+
+
+ private class StorageEngineEntry{
+ StorageEngineConfig config;
+ SchemaProvider provider;
+
+
+ public StorageEngineEntry(StorageEngineConfig config, SchemaProvider provider) {
+ super();
+ this.config = config;
+ this.provider = provider;
+ }
+
+ public StorageEngineConfig getConfig() {
+ return config;
+ }
+ public SchemaProvider getProvider() {
+ return provider;
+ }
+
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
new file mode 100644
index 000000000..818e892c9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import net.hydromatic.optiq.jdbc.ConnectionConfig;
+import net.hydromatic.optiq.tools.Frameworks;
+import net.hydromatic.optiq.tools.Planner;
+import net.hydromatic.optiq.tools.RelConversionException;
+import net.hydromatic.optiq.tools.RuleSet;
+import net.hydromatic.optiq.tools.ValidationException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.FunctionRegistry;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.exec.planner.logical.DrillImplementor;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRuleSets;
+import org.apache.drill.exec.planner.logical.DrillScreenRel;
+import org.apache.drill.exec.planner.logical.DrillStoreRel;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.sql.SqlExplain;
+import org.eigenbase.sql.SqlKind;
+import org.eigenbase.sql.SqlLiteral;
+import org.eigenbase.sql.SqlNode;
+import org.eigenbase.sql.fun.SqlStdOperatorTable;
+import org.eigenbase.sql.parser.SqlParseException;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+public class DrillSqlWorker {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
+
+ private final FunctionRegistry registry;
+ private final Planner planner;
+
+ public DrillSqlWorker(DrillSchemaFactory schemaFactory, FunctionRegistry functionRegistry) throws Exception {
+ this.registry = functionRegistry;
+ this.planner = Frameworks.getPlanner(ConnectionConfig.Lex.MYSQL, schemaFactory, SqlStdOperatorTable.instance(), new RuleSet[]{DrillRuleSets.DRILL_BASIC_RULES});
+
+ }
+
+
+ public LogicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException{
+ SqlNode sqlNode = planner.parse(sql);
+
+ ResultMode resultMode = ResultMode.EXEC;
+ if(sqlNode.getKind() == SqlKind.EXPLAIN){
+ SqlExplain explain = (SqlExplain) sqlNode;
+ sqlNode = explain.operands[0];
+ SqlLiteral op = (SqlLiteral) explain.operands[2];
+ SqlExplain.Depth depth = (SqlExplain.Depth) op.getValue();
+ switch(depth){
+ case Logical:
+ resultMode = ResultMode.LOGICAL;
+ break;
+ case Physical:
+ resultMode = ResultMode.PHYSICAL;
+ break;
+ default:
+ }
+
+
+ }
+
+ SqlNode validatedNode = planner.validate(sqlNode);
+ RelNode relNode = planner.convert(validatedNode);
+ RelNode convertedRelNode = planner.transform(0, planner.getEmptyTraitSet().plus(DrillRel.CONVENTION), relNode);
+ if(convertedRelNode instanceof DrillStoreRel){
+ throw new UnsupportedOperationException();
+ }else{
+ convertedRelNode = new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), convertedRelNode);
+ }
+ DrillImplementor implementor = new DrillImplementor(new DrillParseContext(registry), resultMode);
+ implementor.go( (DrillRel) convertedRelNode);
+ planner.close();
+ planner.reset();
+ return implementor.getPlan();
+
+ }
+ private void x() throws Exception {
+ String sqlAgg = "select a, count(1) from parquet.`/Users/jnadeau/region.parquet` group by a";
+ String sql = "select * from parquet.`/Users/jnadeau/region.parquet`";
+
+
+ System.out.println(sql);
+ System.out.println(getPlan(sql).toJsonString(DrillConfig.create()));
+ System.out.println("///////////");
+ System.out.println(sqlAgg);
+ System.out.println(getPlan(sqlAgg).toJsonString(DrillConfig.create()));
+ }
+
+ public static void main(String[] args) throws Exception {
+ DrillConfig config = DrillConfig.create();
+
+ String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StorageEngines engines = config.getMapper().readValue(enginesData, StorageEngines.class);
+ FunctionRegistry fr = new FunctionRegistry(config);
+ DrillSchemaFactory schemaFactory = new DrillSchemaFactory(engines, config);
+ DrillSqlWorker worker = new DrillSqlWorker(schemaFactory, fr);
+ worker.x();
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java
new file mode 100644
index 000000000..e348bc331
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A special type of concurrent map which attempts to create an object before returning that it does not exist. It will also provide the same functionality on it's keyset.
+ * @param <KEY> The key in the map.
+ * @param <VALUE> The value in the map.
+ */
+public class ExpandingConcurrentMap<KEY, VALUE> implements ConcurrentMap<KEY, VALUE> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpandingConcurrentMap.class);
+
+ private final ConcurrentMap<KEY, VALUE> internalMap = Maps.newConcurrentMap();
+ private final DelegatingKeySet keySet = new DelegatingKeySet();
+ private final MapValueFactory<KEY, VALUE> fac;
+
+ /**
+ * Create a new ExpandingConcurrentMap.
+ * @param fac The object factory responsible for attempting to generate object instances.
+ */
+ public ExpandingConcurrentMap(MapValueFactory<KEY, VALUE> fac) {
+ super();
+ this.fac = fac;
+ }
+
+ @Override
+ public int size() {
+ return internalMap.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return internalMap.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object k) {
+ @SuppressWarnings("unchecked") KEY key = (KEY) k;
+
+ if(internalMap.containsKey(key)) return true;
+ VALUE v = getNewEntry(k);
+ return v != null;
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VALUE get(Object key) {
+ VALUE out = internalMap.get(key);
+ if(out != null) return out;
+ return getNewEntry(key);
+ }
+
+ private VALUE getNewEntry(Object k){
+ @SuppressWarnings("unchecked")
+ KEY key = (KEY) k;
+ VALUE v = this.fac.create(key);
+ if(v == null) return null;
+ VALUE old = internalMap.putIfAbsent(key, v);
+ if(old == null) return v;
+ fac.destroy(v);
+ return old;
+ }
+
+ @Override
+ public VALUE put(KEY key, VALUE value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VALUE remove(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putAll(Map<? extends KEY, ? extends VALUE> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<KEY> keySet() {
+ return this.keySet;
+ }
+
+ @Override
+ public Collection<VALUE> values() {
+ return internalMap.values();
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<KEY, VALUE>> entrySet() {
+ return internalMap.entrySet();
+ }
+
+ @Override
+ public VALUE putIfAbsent(KEY key, VALUE value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ return false;
+ }
+
+ @Override
+ public boolean replace(KEY key, VALUE oldValue, VALUE newValue) {
+ return false;
+ }
+
+ @Override
+ public VALUE replace(KEY key, VALUE value) {
+ return null;
+ }
+
+ private class DelegatingKeySet implements Set<KEY>{
+
+ @Override
+ public int size() {
+ return ExpandingConcurrentMap.this.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return ExpandingConcurrentMap.this.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return ExpandingConcurrentMap.this.containsKey(o);
+ }
+
+ @Override
+ public Iterator<KEY> iterator() {
+ return internalMap.keySet().iterator();
+ }
+
+ @Override
+ public Object[] toArray() {
+ return internalMap.keySet().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return internalMap.keySet().toArray(a);
+ }
+
+ @Override
+ public boolean add(KEY e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ for(Object o : c){
+ if(this.contains(o)) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends KEY> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ public interface MapValueFactory<KEY, VALUE> {
+
+ public VALUE create(KEY key);
+ public void destroy(VALUE value);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java
new file mode 100644
index 000000000..ac29fc3ef
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import net.hydromatic.linq4j.expressions.DefaultExpression;
+import net.hydromatic.linq4j.expressions.Expression;
+import net.hydromatic.optiq.Schema;
+import net.hydromatic.optiq.SchemaPlus;
+import net.hydromatic.optiq.TableFunction;
+
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaProvider;
+
+public class FileSystemSchema implements Schema, ExpandingConcurrentMap.MapValueFactory<String, DrillTable>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchema.class);
+
+ private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(this);
+
+ private final SchemaPlus parentSchema;
+ private final String name;
+ private final Expression expression = new DefaultExpression(Object.class);
+ private final SchemaProvider schemaProvider;
+ private final StorageEngineConfig config;
+
+ public FileSystemSchema(StorageEngineConfig config, SchemaProvider schemaProvider, SchemaPlus parentSchema, String name) {
+ super();
+ this.parentSchema = parentSchema;
+ this.name = name;
+ this.schemaProvider = schemaProvider;
+ this.config = config;
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Expression getExpression() {
+ return expression;
+ }
+
+ @Override
+ public Collection<TableFunction> getTableFunctions(String name) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public SchemaPlus getParentSchema() {
+ return parentSchema;
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return tables.keySet();
+ }
+
+ @Override
+ public Set<String> getTableFunctionNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean isMutable() {
+ return true;
+ }
+
+ @Override
+ public DrillTable getTable(String name) {
+ return tables.get(name);
+ }
+
+ @Override
+ public DrillTable create(String key) {
+ Object selection = schemaProvider.getSelectionBaseOnName(key);
+ if(selection == null) return null;
+
+ return new DrillTable(name, this.name, selection, config);
+ }
+
+ @Override
+ public void destroy(DrillTable value) {
+ }
+
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
new file mode 100644
index 000000000..d4aabb4a1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.torel;
+
+import java.util.List;
+import java.util.Map;
+
+import net.hydromatic.optiq.prepare.Prepare;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.data.Filter;
+import org.apache.drill.common.logical.data.GroupingAggregate;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.Limit;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.common.logical.data.Project;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Union;
+import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
+import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillFilterRel;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillLimitRel;
+import org.apache.drill.exec.planner.logical.DrillProjectRel;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.logical.DrillSortRel;
+import org.apache.drill.exec.planner.logical.DrillUnionRel;
+import org.apache.drill.exec.planner.logical.ScanFieldDeterminer;
+import org.apache.drill.exec.planner.logical.ScanFieldDeterminer.FieldList;
+import org.eigenbase.rel.InvalidRelException;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptTable;
+import org.eigenbase.relopt.RelOptTable.ToRelContext;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.rex.RexBuilder;
+import org.eigenbase.rex.RexNode;
+
+public class ConversionContext implements ToRelContext {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConversionContext.class);
+
+ private static final ConverterVisitor VISITOR = new ConverterVisitor();
+
+ private final Map<Scan, FieldList> scanFieldLists;
+ private final RelOptCluster cluster;
+ private final Prepare prepare;
+
+ public ConversionContext(RelOptCluster cluster, LogicalPlan plan) {
+ super();
+ scanFieldLists = ScanFieldDeterminer.getFieldLists(plan);
+ this.cluster = cluster;
+ this.prepare = null;
+ }
+
+ @Override
+ public RelOptCluster getCluster() {
+ return cluster;
+ }
+
+
+ private FieldList getFieldList(Scan scan) {
+ assert scanFieldLists.containsKey(scan);
+ return scanFieldLists.get(scan);
+ }
+
+
+ public RexBuilder getRexBuilder(){
+ return cluster.getRexBuilder();
+ }
+
+ public RelTraitSet getLogicalTraits(){
+ RelTraitSet set = RelTraitSet.createEmpty();
+ set.add(DrillRel.CONVENTION);
+ return set;
+ }
+
+ public RelNode toRel(LogicalOperator operator) throws InvalidRelException{
+ return operator.accept(VISITOR, this);
+ }
+
+ public RexNode toRex(LogicalExpression e){
+ return null;
+ }
+
+ public RelDataTypeFactory getTypeFactory(){
+ return cluster.getTypeFactory();
+ }
+
+ public RelOptTable getTable(Scan scan){
+ FieldList list = getFieldList(scan);
+
+ return null;
+ }
+
+ @Override
+ public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class ConverterVisitor extends AbstractLogicalVisitor<RelNode, ConversionContext, InvalidRelException>{
+
+ @Override
+ public RelNode visitScan(Scan scan, ConversionContext context){
+ return DrillScanRel.convert(scan, context);
+ }
+
+ @Override
+ public RelNode visitFilter(Filter filter, ConversionContext context) throws InvalidRelException{
+ return DrillFilterRel.convert(filter, context);
+ }
+
+ @Override
+ public RelNode visitProject(Project project, ConversionContext context) throws InvalidRelException{
+ return DrillProjectRel.convert(project, context);
+ }
+
+ @Override
+ public RelNode visitOrder(Order order, ConversionContext context) throws InvalidRelException{
+ return DrillSortRel.convert(order, context);
+ }
+
+ @Override
+ public RelNode visitJoin(Join join, ConversionContext context) throws InvalidRelException{
+ return DrillJoinRel.convert(join, context);
+ }
+
+ @Override
+ public RelNode visitLimit(Limit limit, ConversionContext context) throws InvalidRelException{
+ return DrillLimitRel.convert(limit, context);
+ }
+
+ @Override
+ public RelNode visitUnion(Union union, ConversionContext context) throws InvalidRelException{
+ return DrillUnionRel.convert(union, context);
+ }
+
+ @Override
+ public RelNode visitGroupingAggregate(GroupingAggregate groupBy, ConversionContext context)
+ throws InvalidRelException {
+ return DrillAggregateRel.convert(groupBy, context);
+ }
+
+ }
+
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 991ade6fe..ae21a3cde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -36,6 +36,15 @@ public class BatchSchema implements Iterable<MaterializedField> {
return new SchemaBuilder();
}
+ public int getFieldCount(){
+ return fields.size();
+ }
+
+ public MaterializedField getColumn(int index){
+ if(index < 0 || index >= fields.size()) return null;
+ return fields.get(index);
+ }
+
@Override
public Iterator<MaterializedField> iterator() {
return fields.iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 8d952e32f..6e764a886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -138,17 +138,18 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
@Override
- @SuppressWarnings("unchecked")
public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
VectorWrapper<?> va = wrappers.get(fieldId);
- assert va != null;
- if (va.getVectorClass() != clazz) {
+ if(va!= null && clazz == null){
+ return (VectorWrapper<?>) va;
+ }
+ if (va != null && va.getVectorClass() != clazz) {
logger.warn(String.format(
"Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
return null;
}
- return (VectorWrapper) va;
+ return (VectorWrapper<?>) va;
}
public BatchSchema getSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 4a43a9948..1d9fecfd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -104,7 +104,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
- SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
}
@@ -201,6 +201,10 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
}
+ public void setAutoRead(boolean enableAutoRead){
+ connection.setAutoRead(enableAutoRead);
+ }
+
public void close() {
logger.debug("Closing client");
connection.getChannel().close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index a0b5cfd65..c665949a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import com.google.protobuf.Internal.EnumLite;
import com.google.protobuf.MessageLite;
@@ -48,10 +49,10 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
@Override
protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
- return handle(rpcType, pBody, dBody);
+ return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody);
}
- protected abstract Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
+ protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
@Override
public ServerConnection initRemoteConnection(Channel channel) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 0eaade8f8..a19f8d833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -22,8 +22,9 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-public abstract class RemoteConnection{
+public abstract class RemoteConnection implements ConnectionThrottle{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
private final WriteManager writeManager;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
new file mode 100644
index 000000000..e2ffcc0bc
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+public interface ConnectionThrottle {
+ public void setAutoRead(boolean enableAutoRead);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5b4a504da..5345b31a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -51,7 +51,7 @@ public class QueryResultHandler {
return new SubmissionListener(listener);
}
- public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf dBody) throws RpcException {
final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
final QueryResultBatch batch = new QueryResultBatch(result, dBody);
UserResultsListener l = resultsListener.get(result.getQueryId());
@@ -72,7 +72,7 @@ public class QueryResultHandler {
l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
resultsListener.remove(result.getQueryId(), l);
}else{
- l.resultArrived(batch);
+ l.resultArrived(batch, throttle);
}
if (
@@ -100,13 +100,13 @@ public class QueryResultHandler {
private volatile boolean finished = false;
private volatile RpcException ex;
private volatile UserResultsListener output;
-
+ private volatile ConnectionThrottle throttle;
public boolean transferTo(UserResultsListener l) {
synchronized (this) {
output = l;
boolean last = false;
for (QueryResultBatch r : results) {
- l.resultArrived(r);
+ l.resultArrived(r, throttle);
last = r.getHeader().getIsLastChunk();
}
if(ex != null){
@@ -119,14 +119,15 @@ public class QueryResultHandler {
@Override
- public void resultArrived(QueryResultBatch result) {
+ public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+ this.throttle = throttle;
if(result.getHeader().getIsLastChunk()) finished = true;
synchronized (this) {
if (output == null) {
this.results.add(result);
} else {
- output.resultArrived(result);
+ output.resultArrived(result, throttle);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index f28ff4b81..8b1bdecd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -70,10 +70,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
- protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ protected Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
switch (rpcType) {
case RpcType.QUERY_RESULT_VALUE:
- queryResultHandler.batchArrived(pBody, dBody);
+ queryResultHandler.batchArrived(throttle, pBody, dBody);
return new Response(RpcType.ACK, Ack.getDefaultInstance());
default:
throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 3bcd0cf4b..71f59948e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -24,6 +24,6 @@ public interface UserResultsListener {
public abstract void queryIdArrived(QueryId queryId);
public abstract void submissionFailed(RpcException ex);
- public abstract void resultArrived(QueryResultBatch result);
+ public abstract void resultArrived(QueryResultBatch result, ConnectionThrottle throttle);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 0f45dbee7..aba9ab702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -94,7 +94,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
default:
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 39821e39f..60a7d5c83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -23,12 +23,16 @@ import java.util.Collection;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FunctionRegistry;
import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.planner.sql.DrillSchemaFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -37,7 +41,9 @@ import org.apache.drill.exec.store.StorageEngine;
import org.apache.drill.exec.store.StorageEngineRegistry;
import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
+import com.google.common.io.Resources;
public class DrillbitContext {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
@@ -53,6 +59,9 @@ public class DrillbitContext {
private final OperatorCreatorRegistry operatorCreatorRegistry;
private final Controller controller;
private final WorkEventBus workBus;
+ private final FunctionImplementationRegistry functionRegistry;
+ private final FunctionRegistry functionRegistryX;
+ private final DrillSchemaFactory factory;
public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) {
super();
@@ -70,8 +79,34 @@ public class DrillbitContext {
this.storageEngineRegistry = new StorageEngineRegistry(this);
this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry);
this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig());
+ this.functionRegistry = new FunctionImplementationRegistry(context.getConfig());
+
+ DrillSchemaFactory factory = null;
+ try{
+ String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+ StorageEngines engines = context.getConfig().getMapper().readValue(enginesData, StorageEngines.class);
+ factory = new DrillSchemaFactory(engines, context.getConfig());
+ }catch(Exception e){
+ logger.error("Failure reading storage engines data. Creating empty list of schemas.", e);
+ factory = DrillSchemaFactory.createEmpty();
+ }
+ this.factory = factory;
+ this.functionRegistryX = new FunctionRegistry(context.getConfig());
+
}
+ public DrillSchemaFactory getSchemaFactory(){
+ return factory;
+ }
+
+ public FunctionRegistry getFunctionRegistry(){
+ return functionRegistryX;
+ }
+
+ public FunctionImplementationRegistry getFunctionImplementationRegistry() {
+ return functionRegistry;
+ }
+
public WorkEventBus getWorkBus(){
return workBus;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 1756d9673..cb05273b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -116,7 +116,7 @@ public class ParquetGroupScan extends AbstractGroupScan {
calculateEndpointBytes();
}
- public ParquetGroupScan(ArrayList<ReadEntryWithPath> entries,
+ public ParquetGroupScan(List<ReadEntryWithPath> entries,
ParquetStorageEngine storageEngine, FieldReference ref) throws IOException {
this.storageEngine = storageEngine;
this.engineConfig = storageEngine.getEngineConfig();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
index 86be49e32..c17a9e356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java
@@ -52,7 +52,7 @@ public class ParquetSchemaProvider implements SchemaProvider{
@Override
public Object getSelectionBaseOnName(String tableName) {
try{
-// if(!fs.exists(new Path(tableName))) return null;
+ if(!fs.exists(new Path(tableName))) return null;
ReadEntryWithPath re = new ReadEntryWithPath(tableName);
return Lists.newArrayList(re);
}catch(Exception e){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
index e04d3e969..45b9cc1b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -99,7 +99,7 @@ public class ParquetStorageEngine extends AbstractStorageEngine{
ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
new TypeReference<ArrayList<ReadEntryWithPath>>() {});
- return new ParquetGroupScan(readEntries, this, scan.getOutputReference());
+ return new ParquetGroupScan( Lists.newArrayList(readEntries), this, scan.getOutputReference());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index cbc8b86c7..4f8cd3373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -167,6 +167,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
return ((data.getByte((int) Math.floor(index / 8)) & (int) Math.pow(2, (index % 8))) == 0) ? 0 : 1;
}
+ public boolean isNull(int index){
+ return false;
+ }
+
@Override
public final Object getObject(int index) {
return new Boolean(get(index) != 0);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 4e5364a91..a3d3a8a9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -125,6 +125,8 @@ public interface ValueVector extends Closeable {
public abstract Object getObject(int index);
public int getValueCount();
+
+ public boolean isNull(int index);
public void reset();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java
new file mode 100644
index 000000000..93089e7b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+
+abstract class AbstractSqlAccessor implements SqlAccessor {
+
+ @Override
+ public abstract boolean isNull(int index);
+
+ @Override
+ public BigDecimal getBigDecimal(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("BigDecimal");
+ }
+
+ @Override
+ public boolean getBoolean(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("boolean");
+ }
+
+ @Override
+ public byte getByte(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("byte");
+ }
+
+ @Override
+ public byte[] getBytes(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("byte[]");
+ }
+
+ @Override
+ public Date getDate(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Date");
+ }
+
+ @Override
+ public double getDouble(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("double");
+ }
+
+ @Override
+ public float getFloat(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("float");
+ }
+
+ @Override
+ public int getInt(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("int");
+ }
+
+ @Override
+ public long getLong(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("long");
+ }
+
+ @Override
+ public short getShort(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("short");
+ }
+
+ @Override
+ public InputStream getStream(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("InputStream");
+ }
+
+ @Override
+ public char getChar(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Char");
+ }
+
+ @Override
+ public Reader getReader(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Reader");
+ }
+
+ @Override
+ public String getString(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("String");
+ }
+
+ @Override
+ public Time getTime(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Time");
+ }
+
+ @Override
+ public Timestamp getTimestamp(int index) throws InvalidAccessException{
+ throw new InvalidAccessException("Timestamp");
+ }
+
+ abstract MajorType getType();
+
+
+ public class InvalidAccessException extends SQLException{
+ public InvalidAccessException(String name){
+ super(String.format("Requesting class of type %s for an object of type %s:%s is not allowed.", name, getType().getMinorType().name(), getType().getMode().name()));
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
new file mode 100644
index 000000000..56113540f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.accessor;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+import org.apache.drill.exec.vector.accessor.AbstractSqlAccessor.InvalidAccessException;
+
+public interface SqlAccessor {
+
+ public abstract boolean isNull(int index);
+
+ public abstract BigDecimal getBigDecimal(int index) throws InvalidAccessException;
+
+ public abstract boolean getBoolean(int index) throws InvalidAccessException;
+
+ public abstract byte getByte(int index) throws InvalidAccessException;
+
+ public abstract byte[] getBytes(int index) throws InvalidAccessException;
+
+ public abstract Date getDate(int index) throws InvalidAccessException;
+
+ public abstract double getDouble(int index) throws InvalidAccessException;
+
+ public abstract float getFloat(int index) throws InvalidAccessException;
+
+ public abstract char getChar(int index) throws InvalidAccessException;
+
+ public abstract int getInt(int index) throws InvalidAccessException;
+
+ public abstract long getLong(int index) throws InvalidAccessException;
+
+ public abstract short getShort(int index) throws InvalidAccessException;
+
+ public abstract InputStream getStream(int index) throws InvalidAccessException;
+
+ public abstract Reader getReader(int index) throws InvalidAccessException;
+
+ public abstract String getString(int index) throws InvalidAccessException;
+
+ public abstract Time getTime(int index) throws InvalidAccessException;
+
+ public abstract Timestamp getTimestamp(int index) throws InvalidAccessException;
+
+ public abstract Object getObject(int index) throws InvalidAccessException;
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 29f011f62..b6d441cff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -119,6 +119,7 @@ public class WorkManager implements Closeable{
public class WorkerBee{
public void addFragmentRunner(FragmentExecutor runner){
+ logger.debug("Adding pending task {}", runner);
pendingTasks.add(runner);
}
@@ -167,7 +168,7 @@ public class WorkManager implements Closeable{
try {
while(true){
// logger.debug("Polling for pending work tasks.");
- Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS);
+ Runnable r = pendingTasks.take();
if(r != null){
logger.debug("Starting pending task {}", r);
executor.execute(r);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index e4f8e7b9c..329815d20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,43 +17,58 @@
*/
package org.apache.drill.exec.work.foreman;
+import io.netty.buffer.ByteBuf;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.exception.OptimizerException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.proto.UserProtos.QueryResult;
import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.util.AtomicState;
+import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.exec.work.ErrorHelper;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
/**
@@ -151,6 +166,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
+
break;
case PHYSICAL:
parseAndRunPhysicalPlan(queryRequest.getPlan());
@@ -170,8 +186,18 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
try {
LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
+
+ if(logicalPlan.getProperties().resultMode == ResultMode.LOGICAL){
+ fail("Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.", new Exception());
+ }
if(logger.isDebugEnabled()) logger.debug("Logical {}", logicalPlan.unparse(context.getConfig()));
PhysicalPlan physicalPlan = convert(logicalPlan);
+
+ if(logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL){
+ returnPhysical(physicalPlan);
+ return;
+ }
+
if(logger.isDebugEnabled()) logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan));
runPhysicalPlan(physicalPlan);
} catch (IOException e) {
@@ -181,6 +207,74 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
}
+
+ private void returnLogical(LogicalPlan plan){
+ String jsonPlan = plan.toJsonStringSafe(context.getConfig());
+ sendSingleString("logical", jsonPlan);
+ }
+
+ private void returnPhysical(PhysicalPlan plan){
+ String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
+ sendSingleString("physical", jsonPlan);
+ }
+
+ private void sendSingleString(String columnName, String value){
+ MaterializedField f = MaterializedField.create(new SchemaPath(columnName, ExpressionPosition.UNKNOWN), Types.required(MinorType.VARCHAR));
+ VarCharVector vector = new VarCharVector(f, bee.getContext().getAllocator());
+ byte[] bytes = value.getBytes(Charsets.UTF_8);
+ vector.allocateNew(bytes.length, 1);
+ vector.getMutator().set(0, bytes);
+ vector.getMutator().setValueCount(1);
+ QueryResult header = QueryResult.newBuilder() //
+ .setQueryId(context.getQueryId()) //
+ .setRowCount(1) //
+ .setDef(RecordBatchDef.newBuilder().addField(vector.getMetadata()).build()) //
+ .setIsLastChunk(false) //
+ .build();
+ QueryWritableBatch b1 = new QueryWritableBatch(header, vector.getBuffers());
+ vector.close();
+
+ QueryResult header2 = QueryResult.newBuilder() //
+ .setQueryId(context.getQueryId()) //
+ .setRowCount(0) //
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ QueryWritableBatch b2 = new QueryWritableBatch(header2);
+
+ SingleListener l = new SingleListener();
+ this.initiatingClient.sendResult(l, b1);
+ this.initiatingClient.sendResult(l, b2);
+ l.acct.waitForSendComplete();
+
+ }
+
+
+ class SingleListener implements RpcOutcomeListener<Ack>{
+
+ final SendingAccountor acct;
+
+ public SingleListener(){
+ acct = new SendingAccountor();
+ acct.increment();
+ acct.increment();
+ }
+
+ @Override
+ public void failed(RpcException ex) {
+ acct.decrement();
+ fail("Failure while sending single result.", ex);
+ }
+
+ @Override
+ public void success(Ack value, ByteBuf buffer) {
+ acct.decrement();
+ }
+
+ }
+
+
+
private void parseAndRunPhysicalPlan(String json) {
try {
PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
@@ -192,7 +286,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
private void runPhysicalPlan(PhysicalPlan plan) {
+ if(plan.getProperties().resultMode != ResultMode.EXEC){
+ fail(String.format("Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception());
+ }
PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+
MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor();
Fragment rootFragment;
try {
@@ -218,18 +316,26 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
List<PlanFragment> intermediateFragments = Lists.newArrayList();
// store fragments in distributed grid.
+ logger.debug("Storing fragments");
for (PlanFragment f : work.getFragments()) {
// store all fragments in grid since they are part of handshake.
+
context.getCache().storeFragment(f);
if (f.getLeafFragment()) {
leafFragments.add(f);
} else {
intermediateFragments.add(f);
}
+
+
}
+ logger.debug("Fragments stored.");
+
+ logger.debug("Submitting fragments to run.");
fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments);
+ logger.debug("Fragments running.");
} catch (ExecutionSetupException | RpcException e) {
@@ -238,11 +344,32 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
- private void runSQL(String json) {
- throw new UnsupportedOperationException();
+ private void runSQL(String sql) {
+ try{
+ DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getSchemaFactory(), context.getFunctionRegistry());
+ LogicalPlan plan = sqlWorker.getPlan(sql);
+
+
+ if(plan.getProperties().resultMode == ResultMode.LOGICAL){
+ returnLogical(plan);
+ return;
+ }
+
+ PhysicalPlan physical = convert(plan);
+
+ if(plan.getProperties().resultMode == ResultMode.PHYSICAL){
+ returnPhysical(physical);
+ return;
+ }
+
+ runPhysicalPlan(physical);
+ }catch(Exception e){
+ fail("Failure while parsing sql.", e);
+ }
}
private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
+ if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index e9302e1fc..eaf921dfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -76,18 +76,25 @@ class QueryManager implements FragmentStatusListener{
}
public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{
+ logger.debug("Setting up fragment runs.");
remainingFragmentCount.set(leafFragments.size()+1);
queryId = rootFragment.getHandle().getQueryId();
workBus = bee.getContext().getWorkBus();
// set up the root fragment first so we'll have incoming buffers available.
{
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+ logger.debug("Setting up root context.");
+ FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext().getFunctionImplementationRegistry());
+ logger.debug("Setting up incoming buffers");
IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
+ logger.debug("Setting buffers on root context.");
rootContext.setBuffers(buffers);
+ logger.debug("Generating Exec tree");
RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
+ logger.debug("Exec tree generated.");
// add fragment to local node.
map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true));
+ logger.debug("Fragment added to local node.");
rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment));
RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
@@ -99,6 +106,7 @@ class QueryManager implements FragmentStatusListener{
workBus.setRootFragmentManager(fragmentManager);
}
+
}
// keep track of intermediate fragments (not root or leaf)
@@ -112,6 +120,7 @@ class QueryManager implements FragmentStatusListener{
sendRemoteFragment(f);
}
+ logger.debug("Fragment runs setup is complete.");
}
private void sendRemoteFragment(PlanFragment fragment){