diff options
Diffstat (limited to 'exec/java-exec')
92 files changed, 4134 insertions, 251 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 9ea8304ca..a016afc2e 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -44,12 +44,10 @@ <dependency> <groupId>xerces</groupId> <artifactId>xercesImpl</artifactId> - <version>2.9.1</version> </dependency> <dependency> <groupId>xalan</groupId> <artifactId>xalan</artifactId> - <version>2.7.1</version> </dependency> <dependency> <groupId>com.sun.codemodel</groupId> @@ -58,12 +56,12 @@ </dependency> <dependency> <groupId>org.codehaus.janino</groupId> - <artifactId>commons-compiler-jdk</artifactId> - <version>2.6.1</version> + <artifactId>janino</artifactId> + <version>2.7.3</version> </dependency> <dependency> <groupId>net.hydromatic</groupId> - <artifactId>optiq</artifactId> + <artifactId>optiq-core</artifactId> </dependency> <dependency> <groupId>org.freemarker</groupId> @@ -139,8 +137,8 @@ </dependency> <dependency> <groupId>org.apache.drill.exec</groupId> - <version>4.0.7.Final</version> - <artifactId>drill-netty-bufferl</artifactId> + <version>${project.version}</version> + <artifactId>drill-buffers</artifactId> </dependency> <dependency> <groupId>org.apache.drill</groupId> @@ -196,7 +194,7 @@ <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> - <version>3.1</version> + <version>3.1.4</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl index 3e5a156f7..5083f74c8 100644 --- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl +++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl @@ -18,6 +18,7 @@ import com.google.common.base.Charsets; import com.google.common.collect.ObjectArrays; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; import org.apache.commons.lang3.ArrayUtils; @@ -39,6 +40,8 @@ import java.util.Random; import java.util.List; import java.io.Closeable; +import java.io.InputStream; +import java.io.InputStreamReader; diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index c357dd6b9..2d81299ad 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -155,6 +155,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F return valueCount; } + public boolean isNull(int index){ + return false; + } + <#if (type.width > 8)> public ${minor.javaType!type.javaType} get(int index) { diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index 127c6fd48..4677374a7 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -243,7 +243,12 @@ package org.apache.drill.exec.vector; </#if> get(int index, int positionIndex) { return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex); } + + public boolean isNull(int index){ + return false; + } + public void get(int index, Repeated${minor.class}Holder holder){ holder.start = offsets.getAccessor().get(index); holder.end = offsets.getAccessor().get(index+1); diff --git a/exec/java-exec/src/main/codegen/templates/SqlAccessors.java b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java new file mode 100644 index 000000000..efaf9a66d --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/SqlAccessors.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +<@pp.dropOutputFile /> +<#list vv.types as type> +<#list type.minor as minor> +<#list ["", "Nullable"] as mode> +<#assign name = mode + minor.class?cap_first /> +<#assign javaType = (minor.javaType!type.javaType) /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/accessor/${name}Accessor.java" /> +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.accessor; + +<#include "/@includes/vv_imports.ftl" /> + +@SuppressWarnings("unused") +public class ${name}Accessor extends AbstractSqlAccessor{ + <#if mode == "Nullable"> + private static final MajorType TYPE = Types.optional(MinorType.${minor.class?upper_case}); + <#else> + private static final MajorType TYPE = Types.required(MinorType.${minor.class?upper_case}); + </#if> + + private final ${name}Vector.Accessor ac; + + public ${name}Accessor(${name}Vector vector){ + this.ac = vector.getAccessor(); + } + + public Object getObject(int index){ + return ac.getObject(index); + } + + <#if type.major == "VarLen"> + + @Override + public InputStream getStream(int index){ + ${name}Holder h = new ${name}Holder(); + ac.get(index, h); + return new ByteBufInputStream(h.buffer.slice(h.start, h.end)); + } + + @Override + public byte[] getBytes(int index){ + return ac.get(index); + } + + <#switch minor.class> + <#case "VarBinary"> + <#break> + <#case "VarChar"> + @Override + public InputStreamReader getReader(int index){ + return new InputStreamReader(getStream(index), Charsets.UTF_8); + } + + @Override + public String getString(int index){ + return new String(getBytes(index), Charsets.UTF_8); + } + + + <#break> + <#case "Var16Char"> + @Override + public InputStreamReader getReader(int index){ + return new InputStreamReader(getStream(index), Charsets.UTF_16); + } + + @Override + public String getString(int index){ + return new String(getBytes(index), Charsets.UTF_16); + } + + + <#break> + <#default> + This is uncompilable code + </#switch> + + <#else> + @Override + public ${javaType} get${javaType?cap_first}(int index){ + return ac.get(index); + } + </#if> + + @Override + public boolean isNull(int index){ + return false; + } + + @Override + MajorType getType(){return TYPE;}; + +} + + +</#list> +</#list> +</#list>
\ No newline at end of file diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java index 783f9435d..8c72cf76b 100644 --- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java +++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java @@ -23,7 +23,7 @@ package org.apache.drill.exec.expr; <#include "/@includes/vv_imports.ftl" /> - +import org.apache.drill.exec.vector.accessor.*; public class TypeHelper { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class); @@ -47,6 +47,25 @@ public class TypeHelper { throw new UnsupportedOperationException(); } + public static SqlAccessor getSqlAccessor(ValueVector vector){ + switch(vector.getField().getType().getMinorType()){ + <#list vv.types as type> + <#list type.minor as minor> + case ${minor.class?upper_case}: + switch (vector.getField().getType().getMode()) { + case REQUIRED: + return new ${minor.class}Accessor((${minor.class}Vector) vector); + case OPTIONAL: + return new Nullable${minor.class}Accessor((Nullable${minor.class}Vector) vector); + case REPEATED: + throw new UnsupportedOperationException(); + } + </#list> + </#list> + } + throw new UnsupportedOperationException(); + } + public static Class<?> getValueVectorClass(MinorType type, DataMode mode){ switch (type) { <#list vv.types as type> diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index b059d89fa..5cd83afec 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -225,14 +225,35 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V holder.buffer = data; } + + <#switch minor.class> + <#case "VarChar"> + public Object getObject(int index) { + return new String(get(index), Charsets.UTF_8); + } + <#break> + <#case "Var16Char"> + public Object getObject(int index) { + return new String(get(index), Charsets.UTF_16); + } + <#break> + <#default> public Object getObject(int index) { return get(index); } + + </#switch> + + public int getValueCount() { return valueCount; } + public boolean isNull(int index){ + return false; + } + public UInt${type.width}Vector getOffsetVector(){ return offsetVector; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java index 2f0ab2ea4..5f370ff34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java @@ -35,6 +35,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; import com.hazelcast.config.SerializerConfig; import com.hazelcast.core.DuplicateInstanceNameException; import com.hazelcast.core.Hazelcast; @@ -132,6 +133,10 @@ public class HazelCache implements DistributedCache { @Override public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) { IMap<String, V> imap = this.instance.getMap(clazz.toString()); + MapConfig myMapConfig = new MapConfig(); + myMapConfig.setBackupCount(0); + myMapConfig.setReadBackupData(true); + instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig); return new HCDistributedMapImpl<V>(imap, clazz); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 74eed9424..1297cb343 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL; import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocatorL; import io.netty.channel.nio.NioEventLoopGroup; import java.io.Closeable; @@ -37,11 +35,18 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.proto.UserProtos.QueryType; -import org.apache.drill.exec.rpc.*; +import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection; +import org.apache.drill.exec.rpc.ChannelClosedException; +import org.apache.drill.exec.rpc.DrillRpcFuture; +import org.apache.drill.exec.rpc.NamedThreadFactory; +import org.apache.drill.exec.rpc.RpcConnectionHandler; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserClient; import org.apache.drill.exec.rpc.user.UserResultsListener; @@ -52,7 +57,7 @@ import com.google.common.util.concurrent.SettableFuture; /** * Thin wrapper around a UserClient that handles connect/close and transforms String into ByteBuf */ -public class DrillClient implements Closeable{ +public class DrillClient implements Closeable, ConnectionThrottle{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class); DrillConfig config; @@ -86,17 +91,28 @@ public class DrillClient implements Closeable{ return config; } + @Override + public void setAutoRead(boolean enableAutoRead) { + client.setAutoRead(enableAutoRead); + } + + + /** * Connects the client to a Drillbit server * * @throws IOException */ - public synchronized void connect() throws RpcException { + public void connect() throws RpcException { + connect((String) null); + } + + public synchronized void connect(String connect) throws RpcException { if (connected) return; if (clusterCoordinator == null) { try { - this.clusterCoordinator = new ZKClusterCoordinator(this.config); + this.clusterCoordinator = new ZKClusterCoordinator(this.config, connect); this.clusterCoordinator.start(10000); } catch (Exception e) { throw new RpcException("Failure setting up ZK for client.", e); @@ -172,6 +188,11 @@ public class DrillClient implements Closeable{ return listener.getResults(); } + public void cancelQuery(QueryId id){ + client.send(RpcType.CANCEL_QUERY, id, Ack.class); + } + + /** * Submits a Logical plan for direct execution (bypasses parsing) * @@ -217,7 +238,7 @@ public class DrillClient implements Closeable{ } @Override - public void resultArrived(QueryResultBatch result) { + public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { // logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result); results.add(result); if(result.getHeader().getIsLastChunk()){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index 7adefdb10..b07b3aee7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -17,43 +17,33 @@ */ package org.apache.drill.exec.client; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; -import com.beust.jcommander.Parameters; -import com.beust.jcommander.internal.Lists; -import com.google.common.base.Charsets; -import com.google.common.base.Stopwatch; -import com.google.common.io.Resources; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang.StringUtils; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.rpc.RemoteRpcException; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.util.VectorUtil; -import org.apache.drill.exec.vector.ValueVector; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.CharsetDecoder; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; public class QuerySubmitter { @@ -99,42 +89,50 @@ public class QuerySubmitter { public int submitQuery(String planLocation, String type, String zkQuorum, boolean local, int bits) throws Exception { DrillConfig config = DrillConfig.create(); - DrillClient client; - if (local) { - RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); - Drillbit[] drillbits = new Drillbit[bits]; - for (int i = 0; i < bits; i++) { - drillbits[i] = new Drillbit(config, serviceSet); - drillbits[i].run(); + DrillClient client = null; + + try{ + if (local) { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit[] drillbits = new Drillbit[bits]; + for (int i = 0; i < bits; i++) { + drillbits[i] = new Drillbit(config, serviceSet); + drillbits[i].run(); + } + client = new DrillClient(config, serviceSet.getCoordinator()); + } else { + ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum); + clusterCoordinator.start(10000); + client = new DrillClient(config, clusterCoordinator); } - client = new DrillClient(config, serviceSet.getCoordinator()); - } else { - ZKClusterCoordinator clusterCoordinator = new ZKClusterCoordinator(config, zkQuorum); - clusterCoordinator.start(10000); - client = new DrillClient(config, clusterCoordinator); - } - client.connect(); - QueryResultsListener listener = new QueryResultsListener(); - String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString(); - UserProtos.QueryType queryType; - type = type.toLowerCase(); - switch(type) { - case "logical": - queryType = UserProtos.QueryType.LOGICAL; - break; - case "physical": - queryType = UserProtos.QueryType.PHYSICAL; - break; - default: - System.out.println("Invalid query type: " + type); - return -1; + client.connect(); + QueryResultsListener listener = new QueryResultsListener(); + String plan = Charsets.UTF_8.decode(ByteBuffer.wrap(Files.readAllBytes(Paths.get(planLocation)))).toString(); + UserProtos.QueryType queryType; + type = type.toLowerCase(); + switch(type) { + case "sql": + queryType = UserProtos.QueryType.SQL; + break; + case "logical": + queryType = UserProtos.QueryType.LOGICAL; + break; + case "physical": + queryType = UserProtos.QueryType.PHYSICAL; + break; + default: + System.out.println("Invalid query type: " + type); + return -1; + } + Stopwatch watch = new Stopwatch(); + watch.start(); + client.runQuery(queryType, plan, listener); + int rows = listener.await(); + System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000)); + return 0; + }finally{ + if(client != null) client.close(); } - Stopwatch watch = new Stopwatch(); - watch.start(); - client.runQuery(queryType, plan, listener); - int rows = listener.await(); - System.out.println(String.format("Got %d record%s in %f seconds", rows, rows > 1 ? "s" : "", (float)watch.elapsed(TimeUnit.MILLISECONDS) / (float)1000)); - return 0; } private class QueryResultsListener implements UserResultsListener { @@ -150,7 +148,7 @@ public class QuerySubmitter { } @Override - public void resultArrived(QueryResultBatch result) { + public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { int rows = result.getHeader().getRowCount(); if (result.getData() != null) { count.addAndGet(rows); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java index 904afb5c4..080679b87 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java @@ -112,10 +112,7 @@ class MergeAdapter extends ClassVisitor { return null; } if(arg3 != null){ - System.out.println("a: " + arg3); arg3 = arg3.replace(set.precompiled.slash, set.generated.slash); - System.out.println("b: " + arg3); - } // if( (access & Modifier.PUBLIC) == 0){ // access = access ^ Modifier.PUBLIC ^ Modifier.PROTECTED | Modifier.PRIVATE; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java index 36433ad29..cbd722c03 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java @@ -24,16 +24,18 @@ import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.IfExpression; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.TypedNullConstant; import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.visitors.SimpleExprVisitor; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.record.NullExpression; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; import com.google.common.collect.Lists; -import org.apache.drill.exec.record.VectorAccessible; - public class ExpressionTreeMaterializer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class); @@ -45,7 +47,12 @@ public class ExpressionTreeMaterializer { public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionImplementationRegistry registry) { LogicalExpression materializedExpr = expr.accept(new MaterializeVisitor(batch, errorCollector), null); - return ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry); + LogicalExpression out = ImplicitCastBuilder.injectImplicitCast(materializedExpr, errorCollector, registry); + if(out instanceof NullExpression){ + return new TypedNullConstant(Types.optional(MinorType.INT)); + }else{ + return out; + } } private static class MaterializeVisitor extends SimpleExprVisitor<LogicalExpression> { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index fd965a704..80a78191d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import net.hydromatic.optiq.SchemaPlus; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.compile.ClassTransformer; import org.apache.drill.exec.compile.QueryClassLoader; @@ -75,6 +77,7 @@ public class FragmentContext implements Closeable { this.connection = connection; this.fragment = fragment; this.funcRegistry = funcRegistry; + logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax()); } @@ -91,6 +94,10 @@ public class FragmentContext implements Closeable { public DrillbitContext getDrillbitContext() { return context; } + + public SchemaPlus getRootSchema(){ + return null; + } /** * Get this node's identity. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 256e7729a..11658e63c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -21,9 +21,11 @@ import java.util.Collection; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FunctionRegistry; import org.apache.drill.common.logical.StorageEngineConfig; import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.sql.DrillSchemaFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.control.WorkEventBus; @@ -81,4 +83,13 @@ public class QueryContext { return workBus; } + public DrillSchemaFactory getSchemaFactory(){ + return drillbitContext.getSchemaFactory(); + } + + public FunctionRegistry getFunctionRegistry(){ + return drillbitContext.getFunctionRegistry(); + + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 6e58ec778..cd59428f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.List; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.defs.OrderDef; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; @@ -33,7 +32,6 @@ import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties; import org.apache.drill.common.logical.StorageEngineConfig; import org.apache.drill.common.logical.data.*; -import org.apache.drill.common.logical.data.Order.Direction; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; import org.apache.drill.common.types.TypeProtos; @@ -50,6 +48,8 @@ import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.store.StorageEngine; +import org.eigenbase.rel.RelFieldCollation.Direction; +import org.eigenbase.rel.RelFieldCollation.NullDirection; import com.beust.jcommander.internal.Lists; @@ -123,9 +123,9 @@ public class BasicOptimizer extends Optimizer{ @Override public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException { PhysicalOperator input = order.getInput().accept(this, value); - List<OrderDef> ods = Lists.newArrayList(); + List<Ordering> ods = Lists.newArrayList(); for(Ordering o : order.getOrderings()){ - ods.add(OrderDef.create(o)); + ods.add(o); } return new SelectionVectorRemover(new Sort(input, ods, false)); } @@ -150,13 +150,13 @@ public class BasicOptimizer extends Optimizer{ } // a collapsing aggregate is a currently implemented as a sort followed by a streaming aggregate. - List<OrderDef> orderDefs = Lists.newArrayList(); + List<Ordering> orderDefs = Lists.newArrayList(); List<NamedExpression> keys = Lists.newArrayList(); for(LogicalExpression e : segment.getExprs()){ if( !(e instanceof SchemaPath)) throw new OptimizerException("The basic optimizer doesn't currently support collapsing aggregate where the segment value is something other than a SchemaPath."); keys.add(new NamedExpression(e, new FieldReference((SchemaPath) e))); - orderDefs.add(new OrderDef(Direction.ASC, e)); + orderDefs.add(new Ordering(Direction.Ascending, e, NullDirection.FIRST)); } Sort sort = new Sort(segment.getInput().accept(this, value), orderDefs, false); @@ -169,22 +169,22 @@ public class BasicOptimizer extends Optimizer{ @Override public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException { PhysicalOperator leftOp = join.getLeft().accept(this, value); - List<OrderDef> leftOrderDefs = Lists.newArrayList(); + List<Ordering> leftOrderDefs = Lists.newArrayList(); for(JoinCondition jc : join.getConditions()){ - leftOrderDefs.add(new OrderDef(Direction.ASC, jc.getLeft())); + leftOrderDefs.add(new Ordering(Direction.Ascending, jc.getLeft())); } leftOp = new Sort(leftOp, leftOrderDefs, false); leftOp = new SelectionVectorRemover(leftOp); PhysicalOperator rightOp = join.getRight().accept(this, value); - List<OrderDef> rightOrderDefs = Lists.newArrayList(); + List<Ordering> rightOrderDefs = Lists.newArrayList(); for(JoinCondition jc : join.getConditions()){ - rightOrderDefs.add(new OrderDef(Direction.ASC, jc.getRight())); + rightOrderDefs.add(new Ordering(Direction.Ascending, jc.getRight())); } rightOp = new Sort(rightOp, rightOrderDefs, false); rightOp = new SelectionVectorRemover(rightOp); - MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJointType()); + MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType()); return new SelectionVectorRemover(mjp); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java index 76916ea44..fde88a9b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergeJoinPOP.java @@ -21,13 +21,14 @@ import java.util.Iterator; import java.util.List; import org.apache.drill.common.logical.data.Join; -import org.apache.drill.common.logical.data.Join.JoinType; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.Size; +import org.eigenbase.rel.JoinRelType; +import org.eigenbase.sql.SqlJoinOperator.JoinType; import com.beust.jcommander.internal.Lists; import com.fasterxml.jackson.annotation.JsonCreator; @@ -44,7 +45,7 @@ public class MergeJoinPOP extends AbstractBase{ private final PhysicalOperator left; private final PhysicalOperator right; private final List<JoinCondition> conditions; - private final Join.JoinType joinType; + private final JoinRelType joinType; @Override public OperatorCost getCost() { @@ -56,7 +57,7 @@ public class MergeJoinPOP extends AbstractBase{ @JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, @JsonProperty("join-conditions") List<JoinCondition> conditions, - @JsonProperty("join-type") Join.JoinType joinType + @JsonProperty("join-type") JoinRelType joinType ) { this.left = left; this.right = right; @@ -93,7 +94,7 @@ public class MergeJoinPOP extends AbstractBase{ return right; } - public Join.JoinType getJoinType() { + public JoinRelType getJoinType() { return joinType; } @@ -102,12 +103,12 @@ public class MergeJoinPOP extends AbstractBase{ } public MergeJoinPOP flipIfRight(){ - if(joinType == JoinType.RIGHT){ + if(joinType == JoinRelType.RIGHT){ List<JoinCondition> flippedConditions = Lists.newArrayList(conditions.size()); for(JoinCondition c : conditions){ flippedConditions.add(c.flip()); } - return new MergeJoinPOP(right, left, flippedConditions, JoinType.LEFT); + return new MergeJoinPOP(right, left, flippedConditions, JoinRelType.LEFT); }else{ return this; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java index 667cc33a4..549c65c1c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java @@ -17,18 +17,18 @@ */ package org.apache.drill.exec.physical.config; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.common.defs.OrderDef; -import org.apache.drill.common.expression.LogicalExpression; +import java.util.List; + +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import java.util.List; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; // The goal of this operator is to produce outgoing batches with records // ordered according to the supplied expression. Each incoming batch @@ -39,12 +39,12 @@ public class MergingReceiverPOP extends AbstractReceiver{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverPOP.class); private final List<DrillbitEndpoint> senders; - private final List<OrderDef> orderings; + private final List<Ordering> orderings; @JsonCreator public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, @JsonProperty("senders") List<DrillbitEndpoint> senders, - @JsonProperty("orderings") List<OrderDef> orderings) { + @JsonProperty("orderings") List<Ordering> orderings) { super(oppositeMajorFragmentId); this.senders = senders; this.orderings = orderings; @@ -78,7 +78,7 @@ public class MergingReceiverPOP extends AbstractReceiver{ return new Size(1,1); } - public List<OrderDef> getOrderings() { + public List<Ordering> getOrderings() { return orderings; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java index f0aa85bdc..c49509f40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java @@ -17,27 +17,27 @@ */ package org.apache.drill.exec.physical.config; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import org.apache.drill.common.defs.OrderDef; +import java.util.List; + import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import java.util.List; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; @JsonTypeName("ordered-partition-exchange") public class OrderedPartitionExchange extends AbstractExchange { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionExchange.class); - private final List<OrderDef> orderings; + private final List<Ordering> orderings; private final FieldReference ref; private int recordsToSample = 10000; // How many records must be received before analyzing private int samplingFactor = 10; // Will collect SAMPLING_FACTOR * number of partitions to send to distributed cache @@ -48,7 +48,7 @@ public class OrderedPartitionExchange extends AbstractExchange { private List<DrillbitEndpoint> receiverLocations; @JsonCreator - public OrderedPartitionExchange(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, + public OrderedPartitionExchange(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child, @JsonProperty("recordsToSample") Integer recordsToSample, @JsonProperty("samplingFactor") Integer samplingFactor, @JsonProperty("completionFactor") Float completionFactor) { super(child); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java index de5cf04ef..55632a214 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java @@ -17,24 +17,27 @@ */ package org.apache.drill.exec.physical.config; -import com.beust.jcommander.internal.Lists; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.common.defs.OrderDef; +import java.util.List; + import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.OperatorCost; -import org.apache.drill.exec.physical.base.*; -import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.physical.base.AbstractSender; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import java.util.List; +import com.beust.jcommander.internal.Lists; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("OrderedPartitionSender") public class OrderedPartitionSender extends AbstractSender { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionSender.class); - private final List<OrderDef> orderings; + private final List<Ordering> orderings; private final FieldReference ref; private final List<DrillbitEndpoint> endpoints; private final int sendingWidth; @@ -44,7 +47,7 @@ public class OrderedPartitionSender extends AbstractSender { private float completionFactor; @JsonCreator - public OrderedPartitionSender(@JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child, + public OrderedPartitionSender(@JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("ref") FieldReference ref, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destinations") List<DrillbitEndpoint> endpoints, @JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("sending-fragment-width") int sendingWidth, @JsonProperty("recordsToSample") int recordsToSample, @JsonProperty("samplingFactor") int samplingFactor, @JsonProperty("completionFactor") float completionFactor) { @@ -70,7 +73,7 @@ public class OrderedPartitionSender extends AbstractSender { return endpoints; } - public List<OrderDef> getOrderings() { + public List<Ordering> getOrderings() { return orderings; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java index fbeb6dfb6..b8073c175 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java @@ -18,25 +18,25 @@ package org.apache.drill.exec.physical.config; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.common.defs.OrderDef; +import java.util.List; + import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; -import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.AbstractExchange; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Receiver; import org.apache.drill.exec.physical.base.Sender; import org.apache.drill.exec.proto.CoordinationProtos; -import java.util.List; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName("single-merge-exchange") public class SingleMergeExchange extends AbstractExchange { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleMergeExchange.class); - private final List<OrderDef> orderExpr; + private final List<Ordering> orderExpr; // ephemeral for setup tasks private List<CoordinationProtos.DrillbitEndpoint> senderLocations; @@ -44,7 +44,7 @@ public class SingleMergeExchange extends AbstractExchange { @JsonCreator public SingleMergeExchange(@JsonProperty("child") PhysicalOperator child, - @JsonProperty("orderings") List<OrderDef> orderExpr) { + @JsonProperty("orderings") List<Ordering> orderExpr) { super(child); this.orderExpr = orderExpr; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java index 206e7cd4a..82aa830cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.config; import java.util.List; -import org.apache.drill.common.defs.OrderDef; +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -27,7 +27,6 @@ import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.Size; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @@ -35,17 +34,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class Sort extends AbstractSingle{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class); - private final List<OrderDef> orderings; + private final List<Ordering> orderings; private boolean reverse = false; @JsonCreator - public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) { + public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) { super(child); this.orderings = orderings; this.reverse = reverse; } - public List<OrderDef> getOrderings() { + public List<Ordering> getOrderings() { return orderings; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index aae1a3c21..7c8a51cad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.record.VectorContainer; +import org.eigenbase.rel.JoinRelType; /** * This join template uses a merge join to combine two ordered streams into a single larger batch. When joining @@ -91,7 +92,7 @@ public abstract class JoinTemplate implements JoinWorker { // validate input iterators (advancing to the next record batch if necessary) if (!status.isRightPositionAllowed()) { - if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) { + if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { // we've hit the end of the right record batch; copy any remaining values from the left batch while (status.isLeftPositionAllowed()) { if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) @@ -109,7 +110,7 @@ public abstract class JoinTemplate implements JoinWorker { case -1: // left key < right key - if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == Join.JoinType.LEFT) + if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) if (!doCopyLeft(status.getLeftPosition(), status.fetchAndIncOutputPos())) { return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 298b031f8..bd668e748 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -49,6 +49,7 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.eigenbase.rel.JoinRelType; import com.google.common.collect.ImmutableList; import com.sun.codemodel.JClass; @@ -112,7 +113,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private final RecordBatch right; private final JoinStatus status; private final JoinCondition condition; - private final Join.JoinType joinType; + private final JoinRelType joinType; private JoinWorker worker; public MergeJoinBatchBuilder batchBuilder; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java index 9428b4650..3549a33bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java @@ -17,16 +17,16 @@ */ package org.apache.drill.exec.physical.impl.join; -import com.google.common.base.Preconditions; +import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.logical.data.Join.JoinType; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; +import org.eigenbase.rel.JoinRelType; -import java.util.List; +import com.google.common.base.Preconditions; public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class); @@ -34,7 +34,7 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { @Override public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); - if(config.getJoinType() == JoinType.RIGHT){ + if(config.getJoinType() == JoinRelType.RIGHT){ return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0)); }else{ return new MergeJoinBatch(config, context, children.get(0), children.get(1)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 43d3b457b..72f1ad93b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -25,14 +25,13 @@ import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; -import org.apache.drill.common.defs.OrderDef; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.data.Order.Direction; +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -60,6 +59,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.eigenbase.rel.RelFieldCollation.Direction; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -367,7 +367,7 @@ public class MergingRecordBatch implements RecordBatch { private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException { // set up the expression evaluator and code generation - final List<OrderDef> orderings = config.getOrderings(); + final List<Ordering> orderings = config.getOrderings(); final ErrorCollector collector = new ErrorCollectorImpl(); final ClassGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry()); @@ -550,11 +550,11 @@ public class MergingRecordBatch implements RecordBatch { // generate less than/greater than checks (fixing results for ASCending vs. DESCending) cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1))) ._then() - ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? 1 : -1)); + ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? 1 : -1)); cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1))) ._then() - ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASC ? -1 : 1)); + ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.Ascending ? -1 : 1)); ++comparisonVectorIndex; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index da8978f39..381fbe24b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -23,7 +23,6 @@ import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.drill.common.defs.OrderDef; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; @@ -32,6 +31,7 @@ import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.cache.Counter; @@ -69,6 +69,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; +import org.eigenbase.rel.RelFieldCollation.Direction; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -189,7 +190,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions). // Uses the - // the expressions from the OrderDefs to populate each column. There is one column for each OrderDef in + // the expressions from the Orderings to populate each column. There is one column for each Ordering in // popConfig.orderings. VectorContainer containerToCache = new VectorContainer(); @@ -293,11 +294,11 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart VectorContainer allSamplesContainer = new VectorContainer(); containerBuilder.build(context, allSamplesContainer); - List<OrderDef> orderDefs = Lists.newArrayList(); + List<Ordering> orderDefs = Lists.newArrayList(); int i = 0; - for (OrderDef od : popConfig.getOrderings()) { + for (Ordering od : popConfig.getOrderings()) { SchemaPath sp = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN); - orderDefs.add(new OrderDef(od.getDirection(), new FieldReference(sp))); + orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp))); } // sort the data incoming samples. @@ -330,8 +331,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart /** * Creates a copier that does a project for every Nth record from a VectorContainer incoming into VectorContainer - * outgoing. Each OrderDef in orderings generates a column, and evaluation of the expression associated with each - * OrderDef determines the value of each column. These records will later be sorted based on the values in each + * outgoing. Each Ordering in orderings generates a column, and evaluation of the expression associated with each + * Ordering determines the value of each column. These records will later be sorted based on the values in each * column, in the same order as the orderings. * * @param sv4 @@ -342,14 +343,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart * @throws SchemaChangeException */ private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing, - List<OrderDef> orderings) throws SchemaChangeException { + List<Ordering> orderings) throws SchemaChangeException { List<ValueVector> localAllocationVectors = Lists.newArrayList(); final ErrorCollector collector = new ErrorCollectorImpl(); final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry()); int i = 0; - for (OrderDef od : orderings) { + for (Ordering od : orderings) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, context.getFunctionRegistry()); SchemaPath schemaPath = new SchemaPath("f" + i++, ExpressionPosition.UNKNOWN); TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType()) @@ -521,7 +522,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart cg.setMappingSet(mainMapping); int count = 0; - for (OrderDef od : popConfig.getOrderings()) { + for (Ordering od : popConfig.getOrderings()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); if (collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); @@ -538,7 +539,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart ClassGenerator.HoldingContainer out = cg.addExpr(f, false); JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if (od.getDirection() == Order.Direction.ASC) { + if (od.getDirection() == Direction.Ascending) { jc._then()._return(out.getValue()); } else { jc._then()._return(out.getValue().minus()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 3b8154efd..509d13b41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -88,6 +88,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } return ref; } + private boolean isWildcard(NamedExpression ex){ + LogicalExpression expr = ex.getExpr(); + LogicalExpression ref = ex.getRef(); + if(expr instanceof SchemaPath && ref instanceof SchemaPath){ + PathSegment e = ((SchemaPath) expr).getRootSegment(); + PathSegment n = ((SchemaPath) ref).getRootSegment(); + if(e.isNamed() && e.getNameSegment().getPath().equals("*") && n.isNamed() && n.getChild() != null && n.getChild().isNamed() && n.getChild().getNameSegment().getPath().equals("*")){ + return true; + } + } + return false; + } @Override protected void setupNewSchema() throws SchemaChangeException{ @@ -99,33 +111,43 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - for(int i = 0; i < exprs.size(); i++){ - final NamedExpression namedExpression = exprs.get(i); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry()); - final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType()); - if(collector.hasErrors()){ - throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } - - // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. - if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){ - ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; - ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector(); - Preconditions.checkNotNull(incoming); - - TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); + if(exprs.size() == 1 && isWildcard(exprs.get(0))){ + for(VectorWrapper<?> wrapper : incoming){ + ValueVector vvIn = wrapper.getValueVector(); + TransferPair tp = wrapper.getValueVector().getTransferPair(new FieldReference(vvIn.getField().getName())); transfers.add(tp); container.add(tp.getTo()); - logger.debug("Added transfer."); - }else{ - // need to do evaluation. - ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator()); - allocationVectors.add(vector); - TypedFieldId fid = container.add(vector); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr); - cg.addExpr(write); - logger.debug("Added eval."); } + }else{ + for(int i = 0; i < exprs.size(); i++){ + final NamedExpression namedExpression = exprs.get(i); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry()); + final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType()); + if(collector.hasErrors()){ + throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); + } + + // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. + if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE){ + ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; + ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector(); + Preconditions.checkNotNull(incoming); + + TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); + transfers.add(tp); + container.add(tp.getTo()); + logger.debug("Added transfer."); + }else{ + // need to do evaluation. + ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator()); + allocationVectors.add(vector); + TypedFieldId fid = container.add(vector); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr); + cg.addExpr(write); + logger.debug("Added eval."); + } + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 4d0473558..5fd12c000 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -20,13 +20,11 @@ package org.apache.drill.exec.physical.impl.sort; import java.io.IOException; import java.util.List; -import org.apache.drill.common.defs.OrderDef; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.common.logical.data.Order.Direction; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; @@ -46,6 +44,7 @@ import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.eigenbase.rel.RelFieldCollation.Direction; import com.google.common.collect.ImmutableList; import com.sun.codemodel.JConditional; @@ -165,20 +164,20 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return createNewSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); } - public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException { + public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException { final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); final MappingSet leftMapping = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); final MappingSet rightMapping = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); return createNewSorter(context, orderings, batch, mainMapping, leftMapping, rightMapping); } - public static Sorter createNewSorter(FragmentContext context, List<OrderDef> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) + public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) throws ClassTransformationException, IOException, SchemaChangeException{ CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry()); ClassGenerator<Sorter> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(OrderDef od : orderings){ + for(Ordering od : orderings){ // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); @@ -194,7 +193,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { HoldingContainer out = g.addExpr(f, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASC){ + if(od.getDirection() == Direction.Ascending){ jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java new file mode 100644 index 000000000..b4c1bf9ac --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.BitSet; +import java.util.List; + +import net.hydromatic.linq4j.Ord; +import net.hydromatic.optiq.util.BitSets; + +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.logical.data.GroupingAggregate; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.AggregateCall; +import org.eigenbase.rel.AggregateRelBase; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; + +import com.google.common.collect.Lists; + +/** + * Aggregation implemented in Drill. + */ +public class DrillAggregateRel extends AggregateRelBase implements DrillRel { + /** Creates a DrillAggregateRel. */ + public DrillAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, BitSet groupSet, + List<AggregateCall> aggCalls) throws InvalidRelException { + super(cluster, traits, child, groupSet, aggCalls); + for (AggregateCall aggCall : aggCalls) { + if (aggCall.isDistinct()) { + throw new InvalidRelException("DrillAggregateRel does not support DISTINCT aggregates"); + } + } + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + try { + return new DrillAggregateRel(getCluster(), traitSet, sole(inputs), getGroupSet(), aggCalls); + } catch (InvalidRelException e) { + throw new AssertionError(e); + } + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + + GroupingAggregate.Builder builder = GroupingAggregate.builder(); + builder.setInput(implementor.visitChild(this, 0, getChild())); + final List<String> childFields = getChild().getRowType().getFieldNames(); + final List<String> fields = getRowType().getFieldNames(); + + for (int group : BitSets.toIter(groupSet)) { + FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN); + builder.addKey(fr, fr); + } + + for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) { + FieldReference ref = new FieldReference(fields.get(groupSet.cardinality() + aggCall.i)); + LogicalExpression expr = toDrill(aggCall.e, childFields, implementor); + builder.addExpr(ref, expr); + } + + return builder.build(); + } + + + + + private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) { + List<LogicalExpression> args = Lists.newArrayList(); + for(Integer i : call.getArgList()){ + args.add(new FieldReference(fn.get(i))); + } + + // for count(1). + if(args.isEmpty()) args.add(new ValueExpressions.LongExpression(1l)); + LogicalExpression expr = implementor.getContext().getRegistry().createExpression(call.getAggregation().getName().toLowerCase(), ExpressionPosition.UNKNOWN, args); + return expr; + } + + public static DrillAggregateRel convert(GroupingAggregate groupBy, ConversionContext value) + throws InvalidRelException { + throw new UnsupportedOperationException(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java new file mode 100644 index 000000000..77f1ba6d3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRule.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.AggregateRel; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.*; +import org.eigenbase.trace.EigenbaseTrace; + +import java.util.logging.Logger; + +/** + * Rule that converts an {@link AggregateRel} to a {@link DrillAggregateRel}, implemented by a Drill "segment" operation + * followed by a "collapseaggregate" operation. + */ +public class DrillAggregateRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillAggregateRule(); + protected static final Logger tracer = EigenbaseTrace.getPlannerTracer(); + + private DrillAggregateRule() { + super(RelOptHelper.some(AggregateRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillAggregateRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final AggregateRel aggregate = (AggregateRel) call.rel(0); + final RelNode input = call.rel(1); + final RelTraitSet traits = aggregate.getTraitSet().plus(DrillRel.CONVENTION); + final RelNode convertedInput = convert(input, traits); + try { + call.transformTo(new DrillAggregateRel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(), + aggregate.getAggCallList())); + } catch (InvalidRelException e) { + tracer.warning(e.toString()); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java new file mode 100644 index 000000000..cc147e315 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRel.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import org.apache.drill.common.logical.data.Filter; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.FilterRelBase; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexNode; + +/** + * Filter implemented in Drill. + */ +public class DrillFilterRel extends FilterRelBase implements DrillRel { + protected DrillFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) { + super(cluster, traits, child, condition); + assert getConvention() == CONVENTION; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillFilterRel(getCluster(), traitSet, sole(inputs), getCondition()); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(0.1); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + final LogicalOperator input = implementor.visitChild(this, 0, getChild()); + Filter f = new Filter(DrillOptiq.toDrill(implementor.getContext(), getChild(), getCondition())); + f.setInput(input); + return f; + } + + public static DrillFilterRel convert(Filter filter, ConversionContext context) throws InvalidRelException{ + RelNode input = context.toRel(filter.getInput()); + return new DrillFilterRel(context.getCluster(), context.getLogicalTraits(), input, context.toRex(filter.getExpr())); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java new file mode 100644 index 000000000..d4fb2391a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterRule.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.FilterRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.*; + +/** + * Rule that converts a {@link org.eigenbase.rel.FilterRel} to a Drill "filter" operation. + */ +public class DrillFilterRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillFilterRule(); + + private DrillFilterRule() { + super(RelOptHelper.some(FilterRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillFilterRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final FilterRel filter = (FilterRel) call.rel(0); + final RelNode input = call.rel(1); + final RelTraitSet traits = filter.getTraitSet().plus(DrillRel.CONVENTION); + final RelNode convertedInput = convert(input, traits); + call.transformTo(new DrillFilterRel(filter.getCluster(), traits, convertedInput, filter.getCondition())); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java new file mode 100644 index 000000000..acd218c25 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillImplementor.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.Set; + +import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.common.logical.LogicalPlanBuilder; +import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; +import org.apache.drill.common.logical.PlanProperties.PlanType; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; +import org.eigenbase.rel.RelNode; + +import com.google.common.collect.Sets; + +/** + * Context for converting a tree of {@link DrillRel} nodes into a Drill logical plan. + */ +public class DrillImplementor { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillImplementor.class); + + private Set<DrillTable> tables = Sets.newHashSet(); + private LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(); + private LogicalPlan plan; + private final DrillParseContext context; + + + public DrillImplementor(DrillParseContext context, ResultMode mode) { + planBuilder.planProperties(PlanType.APACHE_DRILL_LOGICAL, 1, DrillImplementor.class.getName(), "", mode); + this.context = context; + } + + public DrillParseContext getContext(){ + return context; + } + + public void registerSource(DrillTable table){ + if(tables.add(table)){ + planBuilder.addStorageEngine(table.getStorageEngineName(), table.getStorageEngineConfig()); + } + } + + public void go(DrillRel root) { + LogicalOperator rootLOP = root.implement(this); + rootLOP.accept(new AddOpsVisitor(), null); + } + + public LogicalPlan getPlan(){ + if(plan == null){ + plan = planBuilder.build(); + planBuilder = null; + } + return plan; + } + + public LogicalOperator visitChild(DrillRel parent, int ordinal, RelNode child) { + return ((DrillRel) child).implement(this); + } + + private class AddOpsVisitor extends AbstractLogicalVisitor<Void, Void, RuntimeException> { + @Override + public Void visitOp(LogicalOperator op, Void value) throws RuntimeException { + planBuilder.addLogicalOperator(op); + for(LogicalOperator o : op){ + o.accept(this, null); + } + return null; + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java new file mode 100644 index 000000000..c08712ef6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.Join; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Project; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.JoinRelBase; +import org.eigenbase.rel.JoinRelType; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptUtil; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexUtil; +import org.eigenbase.sql.fun.SqlStdOperatorTable; +import org.eigenbase.util.Pair; + +/** + * Join implemented in Drill. + */ +public class DrillJoinRel extends JoinRelBase implements DrillRel { + private final List<Integer> leftKeys = new ArrayList<>(); + private final List<Integer> rightKeys = new ArrayList<>(); + + /** Creates a DrillJoinRel. */ + public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition, + JoinRelType joinType) throws InvalidRelException { + super(cluster, traits, left, right, condition, joinType, Collections.<String> emptySet()); + + RexNode remaining = RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys); + if (!remaining.isAlwaysTrue()) { + throw new InvalidRelException("DrillJoinRel only supports equi-join"); + } + } + + @Override + public DrillJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType) { + try { + return new DrillJoinRel(getCluster(), traitSet, left, right, condition, joinType); + } catch (InvalidRelException e) { + throw new AssertionError(e); + } + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + final List<String> fields = getRowType().getFieldNames(); + assert isUnique(fields); + final int leftCount = left.getRowType().getFieldCount(); + final List<String> leftFields = fields.subList(0, leftCount); + final List<String> rightFields = fields.subList(leftCount, fields.size()); + + final LogicalOperator leftOp = implementInput(implementor, 0, 0, left); + final LogicalOperator rightOp = implementInput(implementor, 1, leftCount, right); + + Join.Builder builder = Join.builder(); + builder.type(joinType); + builder.left(leftOp); + builder.right(rightOp); + + for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) { + builder.addCondition("==", new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right))); + } + return builder.build(); + } + + /** + * Check to make sure that the fields of the inputs are the same as the output field names. If not, insert a project renaming them. + * @param implementor + * @param i + * @param offset + * @param input + * @return + */ + private LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, RelNode input) { + final LogicalOperator inputOp = implementor.visitChild(this, i, input); + assert uniqueFieldNames(input.getRowType()); + final List<String> fields = getRowType().getFieldNames(); + final List<String> inputFields = input.getRowType().getFieldNames(); + final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); + if (!outputFields.equals(inputFields)) { + // Ensure that input field names are the same as output field names. + // If there are duplicate field names on left and right, fields will get + // lost. + return rename(implementor, inputOp, inputFields, outputFields); + } else { + return inputOp; + } + } + + private LogicalOperator rename(DrillImplementor implementor, LogicalOperator inputOp, List<String> inputFields, List<String> outputFields) { + Project.Builder builder = Project.builder(); + builder.setInput(inputOp); + for (Pair<String, String> pair : Pair.zip(inputFields, outputFields)) { + builder.addExpr(new FieldReference(pair.right), new FieldReference(pair.left)); + } + return builder.build(); + } + + public static DrillJoinRel convert(Join join, ConversionContext context) throws InvalidRelException{ + RelNode left = context.toRel(join.getLeft()); + RelNode right = context.toRel(join.getRight()); + + List<RexNode> joinConditions = new ArrayList<RexNode>(); + // right fields appear after the LHS fields. + final int rightInputOffset = left.getRowType().getFieldCount(); + for (JoinCondition condition : join.getConditions()) { + RelDataTypeField leftField = left.getRowType().getField(ExprHelper.getFieldName(condition.getLeft()), true); + RelDataTypeField rightField = right.getRowType().getField(ExprHelper.getFieldName(condition.getRight()), true); + joinConditions.add( + context.getRexBuilder().makeCall( + SqlStdOperatorTable.EQUALS, + context.getRexBuilder().makeInputRef(leftField.getType(), leftField.getIndex()), + context.getRexBuilder().makeInputRef(rightField.getType(), rightInputOffset + rightField.getIndex()) + ) + ); + } + RexNode rexCondition = RexUtil.composeConjunction(context.getRexBuilder(), joinConditions, false); + return new DrillJoinRel(context.getCluster(), context.getLogicalTraits(), left, right, rexCondition, join.getJoinType()); + } + + + /** + * Returns whether there are any elements in common between left and right. + */ + private static <T> boolean intersects(List<T> left, List<T> right) { + return new HashSet<>(left).removeAll(right); + } + + private boolean uniqueFieldNames(RelDataType rowType) { + return isUnique(rowType.getFieldNames()); + } + + private static <T> boolean isUnique(List<T> list) { + return new HashSet<>(list).size() == list.size(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java new file mode 100644 index 000000000..f32fa590f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRule.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.JoinRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.*; +import org.eigenbase.trace.EigenbaseTrace; + +import java.util.logging.Logger; + +/** + * Rule that converts a {@link JoinRel} to a {@link DrillJoinRel}, which is implemented by Drill "join" operation. + */ +public class DrillJoinRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillJoinRule(); + protected static final Logger tracer = EigenbaseTrace.getPlannerTracer(); + + private DrillJoinRule() { + super( + RelOptHelper.some(JoinRel.class, Convention.NONE, RelOptHelper.any(RelNode.class), RelOptHelper.any(RelNode.class)), + "DrillJoinRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final JoinRel join = (JoinRel) call.rel(0); + final RelNode left = call.rel(1); + final RelNode right = call.rel(2); + final RelTraitSet traits = join.getTraitSet().plus(DrillRel.CONVENTION); + + final RelNode convertedLeft = convert(left, traits); + final RelNode convertedRight = convert(right, traits); + try { + call.transformTo(new DrillJoinRel(join.getCluster(), traits, convertedLeft, convertedRight, join.getCondition(), + join.getJoinType())); + } catch (InvalidRelException e) { + tracer.warning(e.toString()); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java new file mode 100644 index 000000000..54be0527e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRel.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.math.BigDecimal; +import java.util.List; + +import org.apache.drill.common.logical.data.Limit; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SingleRel; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; +import org.eigenbase.sql.type.SqlTypeName; + +public class DrillLimitRel extends SingleRel implements DrillRel { + private RexNode offset; + private RexNode fetch; + + public DrillLimitRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) { + super(cluster, traitSet, child); + this.offset = offset; + this.fetch = fetch; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillLimitRel(getCluster(), traitSet, sole(inputs), offset, fetch); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + LogicalOperator inputOp = implementor.visitChild(this, 0, getChild()); + + // First offset to include into results (inclusive). Null implies it is starting from offset 0 + int first = offset != null ? Math.max(0, RexLiteral.intValue(offset)) : 0; + + // Last offset to stop including into results (exclusive), translating fetch row counts into an offset. + // Null value implies including entire remaining result set from first offset + Integer last = fetch != null ? Math.max(0, RexLiteral.intValue(fetch)) + first : null; + Limit limit = new Limit(first, last); + limit.setInput(inputOp); + return limit; + } + + public static DrillLimitRel convert(Limit limit, ConversionContext context) throws InvalidRelException{ + RelNode input = context.toRel(limit.getInput()); + RexNode first = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getFirst()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RexNode last = context.getRexBuilder().makeExactLiteral(BigDecimal.valueOf(limit.getLast()), context.getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + return new DrillLimitRel(context.getCluster(), context.getLogicalTraits(), input, first, last); + } + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java new file mode 100644 index 000000000..85c594e45 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SortRel; +import org.eigenbase.relopt.Convention; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTraitSet; + +/** + * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and Limit Rel + */ +public class DrillLimitRule extends RelOptRule { + public static DrillLimitRule INSTANCE = new DrillLimitRule(); + + private DrillLimitRule() { + super(RelOptHelper.some(SortRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillLimitRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final SortRel sort = call.rel(0); + return sort.offset != null || sort.fetch != null; + } + + @Override + public void onMatch(RelOptRuleCall call) { + final SortRel incomingSort = call.rel(0); + final RelTraitSet incomingTraits = incomingSort.getTraitSet(); + RelNode input = incomingSort.getChild(); + + // if the Optiq sort rel includes a collation and a limit, we need to create a copy the sort rel that excludes the + // limit information. + if (!incomingSort.getCollation().getFieldCollations().isEmpty()) { + input = incomingSort.copy(incomingTraits, input, incomingSort.getCollation(), null, null); + } + + RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.CONVENTION)); + call.transformTo(new DrillLimitRel(incomingSort.getCluster(), incomingTraits.plus(DrillRel.CONVENTION), convertedInput, incomingSort.offset, incomingSort.fetch)); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java new file mode 100644 index 000000000..2e29bd425 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.math.BigDecimal; +import java.util.Collections; +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.expression.ValueExpressions.LongExpression; +import org.apache.drill.exec.record.NullExpression; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexCorrelVariable; +import org.eigenbase.rex.RexDynamicParam; +import org.eigenbase.rex.RexFieldAccess; +import org.eigenbase.rex.RexInputRef; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexLocalRef; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexOver; +import org.eigenbase.rex.RexRangeRef; +import org.eigenbase.rex.RexVisitorImpl; +import org.eigenbase.sql.SqlSyntax; +import org.eigenbase.sql.fun.SqlStdOperatorTable; + +import com.google.common.collect.Lists; + +/** + * Utilities for Drill's planner. + */ +public class DrillOptiq { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOptiq.class); + + /** + * Converts a tree of {@link RexNode} operators into a scalar expression in Drill syntax. + */ + static LogicalExpression toDrill(DrillParseContext context, RelNode input, RexNode expr) { + final RexToDrill visitor = new RexToDrill(context, input); + return expr.accept(visitor); + } + + private static class RexToDrill extends RexVisitorImpl<LogicalExpression> { + private final RelNode input; + private final DrillParseContext context; + + RexToDrill(DrillParseContext context, RelNode input) { + super(true); + this.context = context; + this.input = input; + } + + @Override + public LogicalExpression visitInputRef(RexInputRef inputRef) { + final int index = inputRef.getIndex(); + final RelDataTypeField field = input.getRowType().getFieldList().get(index); + return new FieldReference(field.getName()); + } + + @Override + public LogicalExpression visitCall(RexCall call) { + logger.debug("RexCall {}, {}", call); + final SqlSyntax syntax = call.getOperator().getSyntax(); + switch (syntax) { + case BINARY: + logger.debug("Binary"); + LogicalExpression op1 = call.getOperands().get(0).accept(this); + LogicalExpression op2 = call.getOperands().get(1).accept(this); + return context.getRegistry().createExpression(call.getOperator().getName(), Lists.newArrayList(op1, op2)); + case FUNCTION: + logger.debug("Function"); + List<LogicalExpression> exprs = Lists.newArrayList(); + for(RexNode n : call.getOperands()){ + exprs.add(n.accept(this)); + } + return context.getRegistry().createExpression(call.getOperator().getName().toLowerCase(), Lists.newArrayList(exprs)); + case SPECIAL: + logger.debug("Special"); + switch(call.getKind()){ + + case CAST: + return getDrillCastFunctionFromOptiq(call); + } + + if (call.getOperator() == SqlStdOperatorTable.ITEM) { + SchemaPath left = (SchemaPath) call.getOperands().get(0).accept(this); + final RexLiteral literal = (RexLiteral) call.getOperands().get(1); + return left.getChild((String) literal.getValue2()); + } + + // fall through + default: + throw new AssertionError("todo: implement syntax " + syntax + "(" + call + ")"); + } + } + + private LogicalExpression doUnknown(Object o){ + logger.warn("Doesn't currently support consumption of {}.", o); + return NullExpression.INSTANCE; + } + @Override + public LogicalExpression visitLocalRef(RexLocalRef localRef) { + return doUnknown(localRef); + } + + @Override + public LogicalExpression visitOver(RexOver over) { + return doUnknown(over); + } + + @Override + public LogicalExpression visitCorrelVariable(RexCorrelVariable correlVariable) { + return doUnknown(correlVariable); + } + + @Override + public LogicalExpression visitDynamicParam(RexDynamicParam dynamicParam) { + return doUnknown(dynamicParam); + } + + @Override + public LogicalExpression visitRangeRef(RexRangeRef rangeRef) { + return doUnknown(rangeRef); + } + + @Override + public LogicalExpression visitFieldAccess(RexFieldAccess fieldAccess) { + return super.visitFieldAccess(fieldAccess); + } + + + private LogicalExpression getDrillCastFunctionFromOptiq(RexCall call){ + LogicalExpression arg = call.getOperands().get(0).accept(this); + List<LogicalExpression> args = Collections.singletonList(arg); + String fname = null; + switch(call.getType().getSqlTypeName().getName()){ + case "VARCHAR": { + args = Lists.newArrayList(arg, new LongExpression(call.getType().getPrecision())); + return context.getRegistry().createExpression("castVARCHAR", args); + } + case "INTEGER": fname = "castINT"; break; + case "FLOAT": fname = "castFLOAT4"; break; + case "DOUBLE": fname = "castFLOAT8"; break; + case "DECIMAL": throw new UnsupportedOperationException("Need to add decimal."); + default: fname = "cast" + call.getType().getSqlTypeName().getName(); + } + return context.getRegistry().createExpression(fname, args); + + } + + + + @Override + public LogicalExpression visitLiteral(RexLiteral literal) { + switch(literal.getTypeName()){ + case BIGINT: + long l = ((BigDecimal) literal.getValue()).longValue(); + return ValueExpressions.getBigInt(l); + case BOOLEAN: + return ValueExpressions.getBit(((Boolean) literal.getValue())); + case CHAR: + return ValueExpressions.getChar(((String) literal.getValue())); + case DOUBLE: + double d = ((BigDecimal) literal.getValue()).doubleValue(); + return ValueExpressions.getFloat8(d); + case FLOAT: + float f = ((BigDecimal) literal.getValue()).floatValue(); + return ValueExpressions.getFloat4(f); + case INTEGER: + case DECIMAL: + int i = ((BigDecimal) literal.getValue()).intValue(); + return ValueExpressions.getInt(i); + case VARCHAR: + return ValueExpressions.getChar(((String) literal.getValue())); + default: + throw new UnsupportedOperationException(String.format("Unable to convert the value of %s and type %s to a Drill constant expression.", literal, literal.getTypeName())); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java new file mode 100644 index 000000000..b82fef56a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillParseContext.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.drill.common.expression.FunctionRegistry; + +public class DrillParseContext { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParseContext.class); + + private final FunctionRegistry registry; + + public DrillParseContext(FunctionRegistry registry) { + super(); + this.registry = registry; + } + + public FunctionRegistry getRegistry(){ + return registry; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java new file mode 100644 index 000000000..ee3a0f45a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.logical.data.Project; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.ProjectRelBase; +import org.eigenbase.rel.RelCollation; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.reltype.RelDataTypeFieldImpl; +import org.eigenbase.reltype.RelRecordType; +import org.eigenbase.rex.RexNode; +import org.eigenbase.sql.type.SqlTypeName; +import org.eigenbase.util.Pair; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; + +/** + * Project implemented in Drill. + */ +public class DrillProjectRel extends ProjectRelBase implements DrillRel { + protected DrillProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps, + RelDataType rowType) { + super(cluster, traits, child, exps, rowType, Flags.BOXED); + assert getConvention() == CONVENTION; + } + + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillProjectRel(getCluster(), traitSet, sole(inputs), new ArrayList<RexNode>(exps), rowType); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(0.1); + } + + private List<Pair<RexNode, String>> projects() { + return Pair.zip(exps, getRowType().getFieldNames()); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + LogicalOperator inputOp = implementor.visitChild(this, 0, getChild()); + Project.Builder builder = Project.builder(); + builder.setInput(inputOp); + for (Pair<RexNode, String> pair : projects()) { + LogicalExpression expr = DrillOptiq.toDrill(implementor.getContext(), getChild(), pair.left); + builder.addExpr(new FieldReference("output." + pair.right), expr); + } + return builder.build(); + } + + public static DrillProjectRel convert(Project project, ConversionContext context) throws InvalidRelException{ + RelNode input = context.toRel(project.getInput()); + List<RelDataTypeField> fields = Lists.newArrayList(); + List<RexNode> exps = Lists.newArrayList(); + for(NamedExpression expr : project.getSelections()){ + fields.add(new RelDataTypeFieldImpl(expr.getRef().getPath().toString(), fields.size(), context.getTypeFactory().createSqlType(SqlTypeName.ANY) )); + exps.add(context.toRex(expr.getExpr())); + } + return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields)); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java new file mode 100644 index 000000000..bf5bcffc7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRule.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.Convention; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTraitSet; + +/** + * Rule that converts a {@link org.eigenbase.rel.ProjectRel} to a Drill "project" operation. + */ +public class DrillProjectRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillProjectRule(); + + private DrillProjectRule() { + super(RelOptHelper.some(ProjectRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillProjectRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final ProjectRel project = (ProjectRel) call.rel(0); + final RelNode input = call.rel(1); + final RelTraitSet traits = project.getTraitSet().plus(DrillRel.CONVENTION); + final RelNode convertedInput = convert(input, traits); + call.transformTo(new DrillProjectRel(project.getCluster(), traits, convertedInput, project.getProjects(), project + .getRowType())); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java new file mode 100644 index 000000000..63a7207c1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.drill.common.logical.data.LogicalOperator; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.Convention; + +/** + * Relational expression that is implemented in Drill. + */ +public interface DrillRel extends RelNode { + /** Calling convention for relational expressions that are "implemented" by + * generating Drill logical plans. */ + Convention CONVENTION = new Convention.Impl("DRILL", DrillRel.class); + + LogicalOperator implement(DrillImplementor implementor); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java new file mode 100644 index 000000000..53da67f6c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.Iterator; + +import net.hydromatic.optiq.tools.RuleSet; + +import org.eigenbase.rel.rules.MergeProjectRule; +import org.eigenbase.rel.rules.PushFilterPastJoinRule; +import org.eigenbase.rel.rules.PushFilterPastProjectRule; +import org.eigenbase.rel.rules.PushJoinThroughJoinRule; +import org.eigenbase.rel.rules.PushSortPastProjectRule; +import org.eigenbase.rel.rules.ReduceAggregatesRule; +import org.eigenbase.rel.rules.RemoveDistinctAggregateRule; +import org.eigenbase.rel.rules.RemoveDistinctRule; +import org.eigenbase.rel.rules.RemoveSortRule; +import org.eigenbase.rel.rules.RemoveTrivialCalcRule; +import org.eigenbase.rel.rules.RemoveTrivialProjectRule; +import org.eigenbase.rel.rules.SwapJoinRule; +import org.eigenbase.rel.rules.TableAccessRule; +import org.eigenbase.rel.rules.UnionToDistinctRule; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.volcano.AbstractConverter.ExpandConversionRule; + +import com.google.common.collect.ImmutableSet; + +public class DrillRuleSets { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuleSets.class); + + public static final RuleSet DRILL_BASIC_RULES = new DrillRuleSet(ImmutableSet.of( // + +// ExpandConversionRule.instance, +// SwapJoinRule.instance, +// RemoveDistinctRule.instance, +// UnionToDistinctRule.instance, +// RemoveTrivialProjectRule.instance, +// RemoveTrivialCalcRule.instance, +// RemoveSortRule.INSTANCE, +// +// TableAccessRule.instance, // +// MergeProjectRule.instance, // +// PushFilterPastProjectRule.instance, // +// PushFilterPastJoinRule.FILTER_ON_JOIN, // +// RemoveDistinctAggregateRule.instance, // +// ReduceAggregatesRule.instance, // +// SwapJoinRule.instance, // +// PushJoinThroughJoinRule.RIGHT, // +// PushJoinThroughJoinRule.LEFT, // +// PushSortPastProjectRule.INSTANCE, // + + DrillScanRule.INSTANCE, + DrillFilterRule.INSTANCE, + DrillProjectRule.INSTANCE, + DrillAggregateRule.INSTANCE, + + DrillLimitRule.INSTANCE, + DrillSortRule.INSTANCE, + DrillJoinRule.INSTANCE, + DrillUnionRule.INSTANCE + )); + + + private static class DrillRuleSet implements RuleSet{ + final ImmutableSet<RelOptRule> rules; + + public DrillRuleSet(ImmutableSet<RelOptRule> rules) { + super(); + this.rules = rules; + } + + @Override + public Iterator<RelOptRule> iterator() { + return rules.iterator(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java new file mode 100644 index 000000000..afc2d1bd4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.TableAccessRelBase; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptTable; +import org.eigenbase.relopt.RelTraitSet; + +/** + * GroupScan of a Drill table. + */ +public class DrillScanRel extends TableAccessRelBase implements DrillRel { + private final DrillTable drillTable; + + /** Creates a DrillScan. */ + public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table) { + super(cluster, traits, table); + assert getConvention() == CONVENTION; + this.drillTable = table.unwrap(DrillTable.class); + assert drillTable != null; + } + +// @Override +// public void register(RelOptPlanner planner) { +// super.register(planner); +// DrillOptiq.registerStandardPlannerRules(planner); +// } + + public LogicalOperator implement(DrillImplementor implementor) { + Scan.Builder builder = Scan.builder(); + builder.storageEngine(drillTable.getStorageEngineName()); + builder.selection(new JSONOptions(drillTable.getSelection())); + //builder.outputReference(new FieldReference("_MAP")); + implementor.registerSource(drillTable); + return builder.build(); + } + + public static DrillScanRel convert(Scan scan, ConversionContext context){ + return new DrillScanRel(context.getCluster(), context.getLogicalTraits(), context.getTable(scan)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java new file mode 100644 index 000000000..58e648a73 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRule.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel; + +import org.eigenbase.relopt.Convention; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTraitSet; + +public class DrillScanRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillScanRule(); + + private DrillScanRule() { + super(RelOptHelper.any(EnumerableTableAccessRel.class), "DrillTableRule"); + } + + + + + @Override + public void onMatch(RelOptRuleCall call) { + final EnumerableTableAccessRel access = (EnumerableTableAccessRel) call.rel(0); + final RelTraitSet traits = access.getTraitSet().plus(DrillRel.CONVENTION); + call.transformTo(new DrillScanRel(access.getCluster(), traits, access.getTable())); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java new file mode 100644 index 000000000..829947adb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScreenRel.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import net.hydromatic.optiq.impl.java.JavaTypeFactory; +import net.hydromatic.optiq.rules.java.EnumerableConvention; +import net.hydromatic.optiq.rules.java.EnumerableRel; +import net.hydromatic.optiq.rules.java.EnumerableRelImplementor; +import net.hydromatic.optiq.rules.java.JavaRowFormat; +import net.hydromatic.optiq.rules.java.PhysType; +import net.hydromatic.optiq.rules.java.PhysTypeImpl; + +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Store; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SingleRel; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.relopt.RelTraitSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Relational expression that converts from Drill to Enumerable. At runtime it executes a Drill query and returns the + * results as an {@link net.hydromatic.linq4j.Enumerable}. + */ +public class DrillScreenRel extends SingleRel implements DrillRel { + private static final Logger logger = LoggerFactory.getLogger(DrillScreenRel.class); + + private PhysType physType; + + public DrillScreenRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode input) { + super(cluster, traitSet, input); + assert input.getConvention() == DrillRel.CONVENTION; + physType = PhysTypeImpl.of((JavaTypeFactory) cluster.getTypeFactory(), input.getRowType(), JavaRowFormat.ARRAY); + } + + public PhysType getPhysType() { + return physType; + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(.1); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillScreenRel(getCluster(), traitSet, sole(inputs)); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + LogicalOperator childOp = implementor.visitChild(this, 0, getChild()); + return Store.builder().setInput(childOp).storageEngine("--SCREEN--").build(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java new file mode 100644 index 000000000..04af9d5b4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRel.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.logical.data.Order.Ordering; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelCollation; +import org.eigenbase.rel.RelCollationImpl; +import org.eigenbase.rel.RelFieldCollation; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.SortRel; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexNode; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Sort implemented in Drill. + */ +public class DrillSortRel extends SortRel implements DrillRel { + + /** Creates a DrillSortRel. */ + public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation) { + super(cluster, traits, input, collation); + } + + /** Creates a DrillSortRel with offset and fetch. */ + public DrillSortRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) { + super(cluster, traits, input, collation, offset, fetch); + } + + @Override + public DrillSortRel copy(RelTraitSet traitSet, RelNode input, RelCollation collation, RexNode offset, RexNode fetch) { + return new DrillSortRel(getCluster(), traitSet, input, collation, offset, fetch); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + final Order.Builder builder = Order.builder(); + builder.setInput(implementor.visitChild(this, 0, getChild())); + + final List<String> childFields = getChild().getRowType().getFieldNames(); + for(RelFieldCollation fieldCollation : this.collation.getFieldCollations()){ + builder.addOrdering(fieldCollation.getDirection(), + new FieldReference(childFields.get(fieldCollation.getFieldIndex())), + fieldCollation.nullDirection); + } + return builder.build(); + } + + + public static RelNode convert(Order order, ConversionContext context) throws InvalidRelException{ + + // if there are compound expressions in the order by, we need to convert into projects on either side. + RelNode input = context.toRel(order.getInput()); + List<String> fields = input.getRowType().getFieldNames(); + + // build a map of field names to indices. + Map<String, Integer> fieldMap = Maps.newHashMap(); + int i =0; + for(String field : fields){ + fieldMap.put(field, i); + i++; + } + + List<RelFieldCollation> collations = Lists.newArrayList(); + + for(Ordering o : order.getOrderings()){ + String fieldName = ExprHelper.getFieldName(o.getExpr()); + int fieldId = fieldMap.get(fieldName); + RelFieldCollation c = new RelFieldCollation(fieldId, o.getDirection(), o.getNullDirection()); + } + return new DrillSortRel(context.getCluster(), context.getLogicalTraits(), input, RelCollationImpl.of(collations)); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java new file mode 100644 index 000000000..c968e8545 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSortRule.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.SortRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.*; + +/** + * Rule that converts an {@link SortRel} to a {@link DrillSortRel}, implemented by a Drill "order" operation. + */ +public class DrillSortRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillSortRule(); + + private DrillSortRule() { + super(RelOptHelper.some(SortRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillSortRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + final SortRel sort = call.rel(0); + return sort.offset == null && sort.fetch == null; + } + + @Override + public void onMatch(RelOptRuleCall call) { + + final SortRel sort = call.rel(0); + + final RelNode input = call.rel(1); + final RelTraitSet traits = sort.getTraitSet().plus(DrillRel.CONVENTION); + + final RelNode convertedInput = convert(input, input.getTraitSet().plus(DrillRel.CONVENTION)); + call.transformTo(new DrillSortRel(sort.getCluster(), traits, convertedInput, sort.getCollation())); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java new file mode 100644 index 000000000..30c78109e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillStoreRel.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import net.hydromatic.optiq.prepare.Prepare.CatalogReader; + +import org.apache.drill.common.logical.data.LogicalOperator; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.TableModificationRelBase; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptTable; +import org.eigenbase.relopt.RelTraitSet; + +public class DrillStoreRel extends TableModificationRelBase implements DrillRel{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillStoreRel.class); + + protected DrillStoreRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, CatalogReader catalogReader, + RelNode child, Operation operation, List<String> updateColumnList, boolean flattened) { + super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened); + + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + return null; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java new file mode 100644 index 000000000..30dd48de2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.Collections; + +import net.hydromatic.optiq.Schema.TableType; +import net.hydromatic.optiq.Statistic; +import net.hydromatic.optiq.Statistics; +import net.hydromatic.optiq.Table; + +import org.apache.drill.common.logical.StorageEngineConfig; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptTable; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.sql.type.SqlTypeName; + +/** Optiq Table used by Drill. */ +public class DrillTable implements Table{ + + private final String name; + private final String storageEngineName; + public final StorageEngineConfig storageEngineConfig; + private Object selection; + + + /** Creates a DrillTable. */ + public DrillTable(String name, String storageEngineName, Object selection, StorageEngineConfig storageEngineConfig) { + this.name = name; + this.selection = selection; + this.storageEngineConfig = storageEngineConfig; + this.storageEngineName = storageEngineName; + } + + public String getName() { + return name; + } + + public StorageEngineConfig getStorageEngineConfig(){ + return storageEngineConfig; + } + + public Object getSelection() { + return selection; + } + + public String getStorageEngineName() { + return storageEngineName; + } + + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) { + return new DrillScanRel(context.getCluster(), + context.getCluster().traitSetOf(DrillRel.CONVENTION), + table); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return new RelDataTypeDrillImpl(typeFactory); + } + + @Override + public TableType getJdbcTableType() { + return null; + } + + + +// /** Factory for custom tables in Optiq schema. */ +// @SuppressWarnings("UnusedDeclaration") +// public static class Factory implements TableFactory<DrillTable> { +// +// @Override +// public DrillTable create(Schema schema, String name, Map<String, Object> operand, RelDataType rowType) { +// +// final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig(); +// final ClasspathInputConfig inputConfig = new ClasspathInputConfig(); +// inputConfig.path = "/" + name.toLowerCase() + ".json"; +// inputConfig.type = DataWriter.ConverterType.JSON; +// return createTable(schema.getTypeFactory(), (MutableSchema) schema, name, "donuts-json", rseConfig, inputConfig); +// } +// } + + + +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java new file mode 100644 index 000000000..1be9cafea --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRel.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import net.hydromatic.linq4j.Ord; + +import org.apache.drill.common.logical.data.Limit; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Union; +import org.apache.drill.exec.planner.torel.ConversionContext; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.UnionRelBase; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.relopt.RelTraitSet; + +/** + * Union implemented in Drill. + */ +public class DrillUnionRel extends UnionRelBase implements DrillRel { + /** Creates a DrillUnionRel. */ + public DrillUnionRel(RelOptCluster cluster, RelTraitSet traits, + List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + } + + @Override + public DrillUnionRel copy(RelTraitSet traitSet, List<RelNode> inputs, + boolean all) { + return new DrillUnionRel(getCluster(), traitSet, inputs, all); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + // divide cost by two to ensure cheaper than EnumerableDrillRel + return super.computeSelfCost(planner).multiplyBy(.5); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + Union.Builder builder = Union.builder(); + for (Ord<RelNode> input : Ord.zip(inputs)) { + builder.addInput(implementor.visitChild(this, input.i, input.e)); + } + builder.setDistinct(!all); + return builder.build(); + } + + public static DrillUnionRel convert(Union union, ConversionContext context) throws InvalidRelException{ + throw new UnsupportedOperationException(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java new file mode 100644 index 000000000..bc1e6f471 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillUnionRule.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.UnionRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * Rule that converts a {@link UnionRel} to a {@link DrillUnionRel}, implemented by a "union" operation. + */ +public class DrillUnionRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillUnionRule(); + + private DrillUnionRule() { + super(RelOptHelper.any(UnionRel.class, Convention.NONE), "DrillUnionRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final UnionRel union = (UnionRel) call.rel(0); + final RelTraitSet traits = union.getTraitSet().plus(DrillRel.CONVENTION); + final List<RelNode> convertedInputs = new ArrayList<>(); + for (RelNode input : union.getInputs()) { + final RelNode convertedInput = convert(input, traits); + convertedInputs.add(convertedInput); + } + call.transformTo(new DrillUnionRel(union.getCluster(), traits, convertedInputs, union.all)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java new file mode 100644 index 000000000..e77018184 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRel.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import org.apache.drill.common.logical.data.LogicalOperator; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.ValuesRelBase; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.rex.RexLiteral; + +/** + * Values implemented in Drill. + */ +public class DrillValuesRel extends ValuesRelBase implements DrillRel { + protected DrillValuesRel(RelOptCluster cluster, RelDataType rowType, List<List<RexLiteral>> tuples, RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + assert getConvention() == CONVENTION; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return new DrillValuesRel(getCluster(), rowType, tuples, traitSet); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + return super.computeSelfCost(planner).multiplyBy(0.1); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + // Update when https://issues.apache.org/jira/browse/DRILL-57 fixed + throw new UnsupportedOperationException(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java new file mode 100644 index 000000000..fae18caff --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillValuesRule.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.ValuesRel; +import org.eigenbase.relopt.*; + +/** + * Rule that converts a {@link ValuesRel} to a Drill "values" operation. + */ +public class DrillValuesRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillValuesRule(); + + private DrillValuesRule() { + super(RelOptHelper.any(ValuesRel.class, Convention.NONE), "DrillValuesRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final ValuesRel values = (ValuesRel) call.rel(0); + final RelTraitSet traits = values.getTraitSet().plus(DrillRel.CONVENTION); + call.transformTo(new DrillValuesRel(values.getCluster(), values.getRowType(), values.getTuples(), traits)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java new file mode 100644 index 000000000..2fcf6600c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/EnumerableDrillRule.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import net.hydromatic.optiq.rules.java.EnumerableConvention; + +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.convert.ConverterRule; + +/** + * Rule that converts any Drill relational expression to enumerable format by adding a {@link DrillScreenRel}. + */ +public class EnumerableDrillRule extends ConverterRule { + + public static EnumerableDrillRule INSTANCE = new EnumerableDrillRule(EnumerableConvention.INSTANCE); + + + private EnumerableDrillRule(EnumerableConvention outConvention) { + super(RelNode.class, DrillRel.CONVENTION, outConvention, "EnumerableDrillRule." + outConvention); + } + + @Override + public boolean isGuaranteed() { + return true; + } + + @Override + public RelNode convert(RelNode rel) { + assert rel.getTraitSet().contains(DrillRel.CONVENTION); + return new DrillScreenRel(rel.getCluster(), rel.getTraitSet().replace(getOutConvention()), rel); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java new file mode 100644 index 000000000..9456a8128 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ExprHelper.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.List; + +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; + +public class ExprHelper { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExprHelper.class); + + private final static String COMPOUND_FAIL_MESSAGE = "The current Optiq based logical plan interpreter does not complicated expressions. For Order By and Filter"; + + public static String getAggregateFieldName(FunctionCall c){ + List<LogicalExpression> exprs = c.args; + if(exprs.size() != 1) throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE); + return getFieldName(exprs.iterator().next()); + } + + public static String getFieldName(LogicalExpression e){ + if(e instanceof SchemaPath) return ((SchemaPath) e).getPath().toString(); + throw new UnsupportedOperationException(COMPOUND_FAIL_MESSAGE); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java new file mode 100644 index 000000000..835e4a558 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelDataTypeDrillImpl.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + + + + +import org.eigenbase.reltype.*; +import org.eigenbase.sql.SqlCollation; +import org.eigenbase.sql.SqlIdentifier; +import org.eigenbase.sql.SqlIntervalQualifier; +import org.eigenbase.sql.type.SqlTypeName; + +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +/* We use an instance of this class as the row type for + * Drill table. Since we don't know the schema before hand + * whenever optiq requires us to validate that a field exists + * we always return true and indicate that the type of that + * field is 'ANY' + */ +public class RelDataTypeDrillImpl extends RelDataTypeImpl { + + private RelDataTypeField defaultField; + RelDataTypeFactory typeFactory; + List<RelDataTypeField> drillFieldList = new LinkedList<>(); + List<String> drillfieldNames = new LinkedList<>(); + + + public RelDataTypeDrillImpl(RelDataTypeFactory typeFactory) { + this.typeFactory = typeFactory; + computeDigest(); + } + + @Override + public List<RelDataTypeField> getFieldList() { + + if (drillFieldList.size() == 0) + { + /* By default we only have a single row in drill of 'ANY' type + * (mainly for select * type queries) + */ + defaultField = new RelDataTypeFieldImpl("*", 0, typeFactory.createSqlType(SqlTypeName.ANY)); + + drillFieldList.add(defaultField); + drillfieldNames.add("*"); + } + return drillFieldList; + } + + + @Override + public int getFieldCount() { + return drillFieldList.size(); + } + +// @Override +// public int getFieldOrdinal(String fieldName, boolean caseSensitive) { +// +// /* Get the list of fields and return the +// * index if the field exists +// */ +// for (RelDataTypeField field : drillFieldList) { +// if (field.getName().equals(fieldName)) +// return field.getIndex(); +// } +// +// /* Couldn't find the field in our list, return -1 +// * Unsure if I should add it to our list of fields +// */ +// return -1; +// } + + @Override + /** + * + */ + public RelDataTypeField getField(String fieldName, boolean caseSensitive) { + + /* First check if this field name exists in our field list */ + for (RelDataTypeField field : drillFieldList) + { + if (field.getName().equals(fieldName)) + return field; + } + + /* This field does not exist in our field list add it */ + RelDataTypeField newField = new RelDataTypeFieldImpl(fieldName, drillFieldList.size(), typeFactory.createSqlType(SqlTypeName.ANY)); + + /* Add it to the list of fields */ + drillFieldList.add(newField); + + /* Add the name to our list of field names */ + drillfieldNames.add(fieldName); + + return newField; + } + + + @Override + public List<String> getFieldNames() { + + if (drillfieldNames.size() == 0) { + drillfieldNames.add("*"); + } + + return drillfieldNames; + } + + @Override + public SqlTypeName getSqlTypeName() { + return null; + } + + @Override + protected void generateTypeString(StringBuilder sb, boolean withDetail) { + sb.append("DrillRecordRow"); + } + + @Override + public boolean isStruct() { + return true; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java new file mode 100644 index 000000000..92272f879 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleOperand; +import org.eigenbase.relopt.RelTrait; + +public class RelOptHelper { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RelOptHelper.class); + + public static RelOptRuleOperand any(Class<? extends RelNode> first, RelTrait trait){ + return RelOptRule.operand(first, trait, RelOptRule.any()); + } + + public static RelOptRuleOperand any(Class<? extends RelNode> first){ + return RelOptRule.operand(first, RelOptRule.any()); + } + + public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){ + return RelOptRule.operand(rel, RelOptRule.some(first, rest)); + } + + public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelTrait trait, RelOptRuleOperand first, RelOptRuleOperand... rest){ + return RelOptRule.operand(rel, trait, RelOptRule.some(first, rest)); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java new file mode 100644 index 000000000..980977085 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/ScanFieldDeterminer.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; +import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.common.logical.data.Constant; +import org.apache.drill.common.logical.data.Filter; +import org.apache.drill.common.logical.data.GroupingAggregate; +import org.apache.drill.common.logical.data.Join; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.Limit; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.logical.data.Order.Ordering; +import org.apache.drill.common.logical.data.Project; +import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.common.logical.data.SinkOperator; +import org.apache.drill.common.logical.data.Store; +import org.apache.drill.common.logical.data.Union; +import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * This visitor will walk a logical plan and record in a map the list of field references associated to each scan. These + * can then be used to update scan object to appear to be explicitly fielded for optimization purposes. + */ +public class ScanFieldDeterminer extends AbstractLogicalVisitor<Void, ScanFieldDeterminer.FieldList, RuntimeException> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanFieldDeterminer.class); + + private FieldReferenceFinder finder = new FieldReferenceFinder(); + private Map<Scan, FieldList> scanFields = Maps.newHashMap(); + + + public static Map<Scan, FieldList> getFieldLists(LogicalPlan plan){ + Collection<SinkOperator> ops = plan.getGraph().getRoots(); + Preconditions.checkArgument(ops.size() == 1, "Scan Field determiner currently only works with plans that have a single root."); + ScanFieldDeterminer sfd = new ScanFieldDeterminer(); + ops.iterator().next().accept(sfd, new FieldList()); + return sfd.scanFields; + } + + private ScanFieldDeterminer(){ + } + + public static class FieldList { + private Set<SchemaPath> projected = Sets.newHashSet(); + private Set<SchemaPath> referenced = Sets.newHashSet(); + + public void addProjected(SchemaPath path) { + projected.add(path); + } + + public void addReferenced(SchemaPath path) { + referenced.add(path); + } + + public void addReferenced(Collection<SchemaPath> paths) { + referenced.addAll(paths); + } + + public void addProjected(Collection<SchemaPath> paths) { + projected.addAll(paths); + } + + public FieldList clone() { + FieldList newList = new FieldList(); + for (SchemaPath p : projected) { + newList.addProjected(p); + } + for (SchemaPath p : referenced) { + newList.addReferenced(p); + } + return newList; + } + } + + @Override + public Void visitScan(Scan scan, FieldList value) { + if (value == null) { + scanFields.put(scan, new FieldList()); + } else { + scanFields.put(scan, value); + } + return null; + } + + @Override + public Void visitStore(Store store, FieldList value) { + store.getInput().accept(this, value); + return null; + } + + @Override + public Void visitGroupingAggregate(GroupingAggregate groupBy, FieldList value) { + FieldList list = new FieldList(); + for (NamedExpression e : groupBy.getExprs()) { + list.addProjected(e.getExpr().accept(finder, null)); + } + for (NamedExpression e : groupBy.getKeys()) { + list.addProjected(e.getExpr().accept(finder, null)); + } + groupBy.getInput().accept(this, list); + return null; + } + + @Override + public Void visitFilter(Filter filter, FieldList value) { + value.addReferenced(filter.getExpr().accept(finder, null)); + return null; + } + + @Override + public Void visitProject(Project project, FieldList value) { + FieldList fl = new FieldList(); + for (NamedExpression e : project.getSelections()) { + fl.addProjected(e.getExpr().accept(finder, null)); + } + return null; + } + + @Override + public Void visitConstant(Constant constant, FieldList value) { + return null; + } + + @Override + public Void visitOrder(Order order, FieldList fl) { + for (Ordering o : order.getOrderings()) { + fl.addReferenced(o.getExpr().accept(finder, null)); + } + return null; + } + + @Override + public Void visitJoin(Join join, FieldList fl) { + { + FieldList leftList = fl.clone(); + for (JoinCondition c : join.getConditions()) { + leftList.addReferenced(c.getLeft().accept(finder, null)); + } + join.getLeft().accept(this, leftList); + } + + { + FieldList rightList = fl.clone(); + for (JoinCondition c : join.getConditions()) { + rightList.addReferenced(c.getRight().accept(finder, null)); + } + join.getLeft().accept(this, rightList); + } + return null; + } + + @Override + public Void visitLimit(Limit limit, FieldList value) { + limit.getInput().accept(this, value); + return null; + } + + @Override + public Void visitUnion(Union union, FieldList value) { + for (LogicalOperator o : union.getInputs()) { + o.accept(this, value.clone()); + } + return null; + } + + + /** + * Search through a LogicalExpression, finding all internal schema path references and returning them in a set. + */ + private class FieldReferenceFinder extends AbstractExprVisitor<Set<SchemaPath>, Void, RuntimeException> { + + @Override + public Set<SchemaPath> visitSchemaPath(SchemaPath path, Void value) { + Set<SchemaPath> set = Sets.newHashSet(); + set.add(path); + return set; + } + + @Override + public Set<SchemaPath> visitUnknown(LogicalExpression e, Void value) { + Set<SchemaPath> paths = Sets.newHashSet(); + for (LogicalExpression ex : e) { + paths.addAll(ex.accept(this, null)); + } + return paths; + } + + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java new file mode 100644 index 000000000..a020ab392 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.logical.StorageEngineConfig; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; + +public class StorageEngines implements Iterable<Map.Entry<String, StorageEngineConfig>>{ + + private Map<String, StorageEngineConfig> storage; + + @JsonCreator + public StorageEngines(@JsonProperty("storage") Map<String, StorageEngineConfig> storage){ + this.storage = storage; + } + + public static void main(String[] args) throws Exception{ + DrillConfig config = DrillConfig.create(); + String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8); + StorageEngines se = config.getMapper().readValue(data, StorageEngines.class); + System.out.println(se); + } + + @Override + public String toString() { + final int maxLen = 10; + return "StorageEngines [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]"; + } + + @Override + public Iterator<Entry<String, StorageEngineConfig>> iterator() { + return storage.entrySet().iterator(); + } + + private String toString(Collection<?> collection, int maxLen) { + StringBuilder builder = new StringBuilder(); + builder.append("["); + int i = 0; + for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) { + if (i > 0) + builder.append(", "); + builder.append(iterator.next()); + } + builder.append("]"); + return builder.toString(); + } + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java new file mode 100644 index 000000000..23d03b81c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSchemaFactory.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import net.hydromatic.linq4j.function.Function1; +import net.hydromatic.optiq.Schema; +import net.hydromatic.optiq.SchemaPlus; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.logical.StorageEngineConfig; +import org.apache.drill.exec.exception.SetupException; +import org.apache.drill.exec.planner.logical.StorageEngines; +import org.apache.drill.exec.store.SchemaProvider; +import org.apache.drill.exec.store.SchemaProviderRegistry; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class DrillSchemaFactory implements Function1<SchemaPlus, Schema>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSchemaFactory.class); + + private final SchemaProviderRegistry registry; + private final Map<String, StorageEngineEntry> preEntries = Maps.newHashMap(); + + public static DrillSchemaFactory createEmpty(){ + return new DrillSchemaFactory(); + } + + private DrillSchemaFactory(){ + this.registry = null; + } + + public DrillSchemaFactory(StorageEngines engines, DrillConfig config) throws SetupException { + super(); + this.registry = new SchemaProviderRegistry(config); + + for (Map.Entry<String, StorageEngineConfig> entry : engines) { + SchemaProvider provider = registry.getSchemaProvider(entry.getValue()); + preEntries.put(entry.getKey(), new StorageEngineEntry(entry.getValue(), provider)); + } + + } + + public Schema apply(SchemaPlus root) { + List<String> schemaNames = Lists.newArrayList(); + Schema defaultSchema = null; + for(Entry<String, StorageEngineEntry> e : preEntries.entrySet()){ + FileSystemSchema schema = new FileSystemSchema(e.getValue().getConfig(), e.getValue().getProvider(), root, e.getKey()); + if(defaultSchema == null) defaultSchema = schema; + root.add(schema); + schemaNames.add(e.getKey()); + } + logger.debug("Registered schemas for {}", schemaNames); + return defaultSchema; + } + + + private class StorageEngineEntry{ + StorageEngineConfig config; + SchemaProvider provider; + + + public StorageEngineEntry(StorageEngineConfig config, SchemaProvider provider) { + super(); + this.config = config; + this.provider = provider; + } + + public StorageEngineConfig getConfig() { + return config; + } + public SchemaProvider getProvider() { + return provider; + } + + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java new file mode 100644 index 000000000..818e892c9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import net.hydromatic.optiq.jdbc.ConnectionConfig; +import net.hydromatic.optiq.tools.Frameworks; +import net.hydromatic.optiq.tools.Planner; +import net.hydromatic.optiq.tools.RelConversionException; +import net.hydromatic.optiq.tools.RuleSet; +import net.hydromatic.optiq.tools.ValidationException; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.FunctionRegistry; +import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; +import org.apache.drill.exec.planner.logical.DrillImplementor; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillRuleSets; +import org.apache.drill.exec.planner.logical.DrillScreenRel; +import org.apache.drill.exec.planner.logical.DrillStoreRel; +import org.apache.drill.exec.planner.logical.StorageEngines; +import org.eigenbase.rel.RelNode; +import org.eigenbase.sql.SqlExplain; +import org.eigenbase.sql.SqlKind; +import org.eigenbase.sql.SqlLiteral; +import org.eigenbase.sql.SqlNode; +import org.eigenbase.sql.fun.SqlStdOperatorTable; +import org.eigenbase.sql.parser.SqlParseException; + +import com.google.common.base.Charsets; +import com.google.common.io.Resources; + +public class DrillSqlWorker { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class); + + private final FunctionRegistry registry; + private final Planner planner; + + public DrillSqlWorker(DrillSchemaFactory schemaFactory, FunctionRegistry functionRegistry) throws Exception { + this.registry = functionRegistry; + this.planner = Frameworks.getPlanner(ConnectionConfig.Lex.MYSQL, schemaFactory, SqlStdOperatorTable.instance(), new RuleSet[]{DrillRuleSets.DRILL_BASIC_RULES}); + + } + + + public LogicalPlan getPlan(String sql) throws SqlParseException, ValidationException, RelConversionException{ + SqlNode sqlNode = planner.parse(sql); + + ResultMode resultMode = ResultMode.EXEC; + if(sqlNode.getKind() == SqlKind.EXPLAIN){ + SqlExplain explain = (SqlExplain) sqlNode; + sqlNode = explain.operands[0]; + SqlLiteral op = (SqlLiteral) explain.operands[2]; + SqlExplain.Depth depth = (SqlExplain.Depth) op.getValue(); + switch(depth){ + case Logical: + resultMode = ResultMode.LOGICAL; + break; + case Physical: + resultMode = ResultMode.PHYSICAL; + break; + default: + } + + + } + + SqlNode validatedNode = planner.validate(sqlNode); + RelNode relNode = planner.convert(validatedNode); + RelNode convertedRelNode = planner.transform(0, planner.getEmptyTraitSet().plus(DrillRel.CONVENTION), relNode); + if(convertedRelNode instanceof DrillStoreRel){ + throw new UnsupportedOperationException(); + }else{ + convertedRelNode = new DrillScreenRel(convertedRelNode.getCluster(), convertedRelNode.getTraitSet(), convertedRelNode); + } + DrillImplementor implementor = new DrillImplementor(new DrillParseContext(registry), resultMode); + implementor.go( (DrillRel) convertedRelNode); + planner.close(); + planner.reset(); + return implementor.getPlan(); + + } + private void x() throws Exception { + String sqlAgg = "select a, count(1) from parquet.`/Users/jnadeau/region.parquet` group by a"; + String sql = "select * from parquet.`/Users/jnadeau/region.parquet`"; + + + System.out.println(sql); + System.out.println(getPlan(sql).toJsonString(DrillConfig.create())); + System.out.println("///////////"); + System.out.println(sqlAgg); + System.out.println(getPlan(sqlAgg).toJsonString(DrillConfig.create())); + } + + public static void main(String[] args) throws Exception { + DrillConfig config = DrillConfig.create(); + + String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8); + StorageEngines engines = config.getMapper().readValue(enginesData, StorageEngines.class); + FunctionRegistry fr = new FunctionRegistry(config); + DrillSchemaFactory schemaFactory = new DrillSchemaFactory(engines, config); + DrillSqlWorker worker = new DrillSqlWorker(schemaFactory, fr); + worker.x(); + } + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java new file mode 100644 index 000000000..e348bc331 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/ExpandingConcurrentMap.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.collect.Maps; + +/** + * A special type of concurrent map which attempts to create an object before returning that it does not exist. It will also provide the same functionality on it's keyset. + * @param <KEY> The key in the map. + * @param <VALUE> The value in the map. + */ +public class ExpandingConcurrentMap<KEY, VALUE> implements ConcurrentMap<KEY, VALUE> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpandingConcurrentMap.class); + + private final ConcurrentMap<KEY, VALUE> internalMap = Maps.newConcurrentMap(); + private final DelegatingKeySet keySet = new DelegatingKeySet(); + private final MapValueFactory<KEY, VALUE> fac; + + /** + * Create a new ExpandingConcurrentMap. + * @param fac The object factory responsible for attempting to generate object instances. + */ + public ExpandingConcurrentMap(MapValueFactory<KEY, VALUE> fac) { + super(); + this.fac = fac; + } + + @Override + public int size() { + return internalMap.size(); + } + + @Override + public boolean isEmpty() { + return internalMap.isEmpty(); + } + + @Override + public boolean containsKey(Object k) { + @SuppressWarnings("unchecked") KEY key = (KEY) k; + + if(internalMap.containsKey(key)) return true; + VALUE v = getNewEntry(k); + return v != null; + } + + @Override + public boolean containsValue(Object value) { + throw new UnsupportedOperationException(); + } + + @Override + public VALUE get(Object key) { + VALUE out = internalMap.get(key); + if(out != null) return out; + return getNewEntry(key); + } + + private VALUE getNewEntry(Object k){ + @SuppressWarnings("unchecked") + KEY key = (KEY) k; + VALUE v = this.fac.create(key); + if(v == null) return null; + VALUE old = internalMap.putIfAbsent(key, v); + if(old == null) return v; + fac.destroy(v); + return old; + } + + @Override + public VALUE put(KEY key, VALUE value) { + throw new UnsupportedOperationException(); + } + + @Override + public VALUE remove(Object key) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map<? extends KEY, ? extends VALUE> m) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public Set<KEY> keySet() { + return this.keySet; + } + + @Override + public Collection<VALUE> values() { + return internalMap.values(); + } + + @Override + public Set<java.util.Map.Entry<KEY, VALUE>> entrySet() { + return internalMap.entrySet(); + } + + @Override + public VALUE putIfAbsent(KEY key, VALUE value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object key, Object value) { + return false; + } + + @Override + public boolean replace(KEY key, VALUE oldValue, VALUE newValue) { + return false; + } + + @Override + public VALUE replace(KEY key, VALUE value) { + return null; + } + + private class DelegatingKeySet implements Set<KEY>{ + + @Override + public int size() { + return ExpandingConcurrentMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ExpandingConcurrentMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ExpandingConcurrentMap.this.containsKey(o); + } + + @Override + public Iterator<KEY> iterator() { + return internalMap.keySet().iterator(); + } + + @Override + public Object[] toArray() { + return internalMap.keySet().toArray(); + } + + @Override + public <T> T[] toArray(T[] a) { + return internalMap.keySet().toArray(a); + } + + @Override + public boolean add(KEY e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection<?> c) { + for(Object o : c){ + if(this.contains(o)) return false; + } + return true; + } + + @Override + public boolean addAll(Collection<? extends KEY> c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + } + + public interface MapValueFactory<KEY, VALUE> { + + public VALUE create(KEY key); + public void destroy(VALUE value); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java new file mode 100644 index 000000000..ac29fc3ef --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/FileSystemSchema.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; + +import net.hydromatic.linq4j.expressions.DefaultExpression; +import net.hydromatic.linq4j.expressions.Expression; +import net.hydromatic.optiq.Schema; +import net.hydromatic.optiq.SchemaPlus; +import net.hydromatic.optiq.TableFunction; + +import org.apache.drill.common.logical.StorageEngineConfig; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.store.SchemaProvider; + +public class FileSystemSchema implements Schema, ExpandingConcurrentMap.MapValueFactory<String, DrillTable>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchema.class); + + private ExpandingConcurrentMap<String, DrillTable> tables = new ExpandingConcurrentMap<String, DrillTable>(this); + + private final SchemaPlus parentSchema; + private final String name; + private final Expression expression = new DefaultExpression(Object.class); + private final SchemaProvider schemaProvider; + private final StorageEngineConfig config; + + public FileSystemSchema(StorageEngineConfig config, SchemaProvider schemaProvider, SchemaPlus parentSchema, String name) { + super(); + this.parentSchema = parentSchema; + this.name = name; + this.schemaProvider = schemaProvider; + this.config = config; + } + + @Override + public Schema getSubSchema(String name) { + return null; + } + + @Override + public String getName() { + return name; + } + + @Override + public Expression getExpression() { + return expression; + } + + @Override + public Collection<TableFunction> getTableFunctions(String name) { + return Collections.emptyList(); + } + + @Override + public SchemaPlus getParentSchema() { + return parentSchema; + } + + @Override + public Set<String> getTableNames() { + return tables.keySet(); + } + + @Override + public Set<String> getTableFunctionNames() { + return Collections.emptySet(); + } + + @Override + public Set<String> getSubSchemaNames() { + return Collections.emptySet(); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public DrillTable getTable(String name) { + return tables.get(name); + } + + @Override + public DrillTable create(String key) { + Object selection = schemaProvider.getSelectionBaseOnName(key); + if(selection == null) return null; + + return new DrillTable(name, this.name, selection, config); + } + + @Override + public void destroy(DrillTable value) { + } + + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java new file mode 100644 index 000000000..d4aabb4a1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/torel/ConversionContext.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.torel; + +import java.util.List; +import java.util.Map; + +import net.hydromatic.optiq.prepare.Prepare; + +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.common.logical.data.Filter; +import org.apache.drill.common.logical.data.GroupingAggregate; +import org.apache.drill.common.logical.data.Join; +import org.apache.drill.common.logical.data.Limit; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Order; +import org.apache.drill.common.logical.data.Project; +import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.common.logical.data.Union; +import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; +import org.apache.drill.exec.planner.logical.DrillAggregateRel; +import org.apache.drill.exec.planner.logical.DrillFilterRel; +import org.apache.drill.exec.planner.logical.DrillJoinRel; +import org.apache.drill.exec.planner.logical.DrillLimitRel; +import org.apache.drill.exec.planner.logical.DrillProjectRel; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.logical.DrillSortRel; +import org.apache.drill.exec.planner.logical.DrillUnionRel; +import org.apache.drill.exec.planner.logical.ScanFieldDeterminer; +import org.apache.drill.exec.planner.logical.ScanFieldDeterminer.FieldList; +import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptTable; +import org.eigenbase.relopt.RelOptTable.ToRelContext; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.rex.RexBuilder; +import org.eigenbase.rex.RexNode; + +public class ConversionContext implements ToRelContext { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConversionContext.class); + + private static final ConverterVisitor VISITOR = new ConverterVisitor(); + + private final Map<Scan, FieldList> scanFieldLists; + private final RelOptCluster cluster; + private final Prepare prepare; + + public ConversionContext(RelOptCluster cluster, LogicalPlan plan) { + super(); + scanFieldLists = ScanFieldDeterminer.getFieldLists(plan); + this.cluster = cluster; + this.prepare = null; + } + + @Override + public RelOptCluster getCluster() { + return cluster; + } + + + private FieldList getFieldList(Scan scan) { + assert scanFieldLists.containsKey(scan); + return scanFieldLists.get(scan); + } + + + public RexBuilder getRexBuilder(){ + return cluster.getRexBuilder(); + } + + public RelTraitSet getLogicalTraits(){ + RelTraitSet set = RelTraitSet.createEmpty(); + set.add(DrillRel.CONVENTION); + return set; + } + + public RelNode toRel(LogicalOperator operator) throws InvalidRelException{ + return operator.accept(VISITOR, this); + } + + public RexNode toRex(LogicalExpression e){ + return null; + } + + public RelDataTypeFactory getTypeFactory(){ + return cluster.getTypeFactory(); + } + + public RelOptTable getTable(Scan scan){ + FieldList list = getFieldList(scan); + + return null; + } + + @Override + public RelNode expandView(RelDataType rowType, String queryString, List<String> schemaPath) { + throw new UnsupportedOperationException(); + } + + private static class ConverterVisitor extends AbstractLogicalVisitor<RelNode, ConversionContext, InvalidRelException>{ + + @Override + public RelNode visitScan(Scan scan, ConversionContext context){ + return DrillScanRel.convert(scan, context); + } + + @Override + public RelNode visitFilter(Filter filter, ConversionContext context) throws InvalidRelException{ + return DrillFilterRel.convert(filter, context); + } + + @Override + public RelNode visitProject(Project project, ConversionContext context) throws InvalidRelException{ + return DrillProjectRel.convert(project, context); + } + + @Override + public RelNode visitOrder(Order order, ConversionContext context) throws InvalidRelException{ + return DrillSortRel.convert(order, context); + } + + @Override + public RelNode visitJoin(Join join, ConversionContext context) throws InvalidRelException{ + return DrillJoinRel.convert(join, context); + } + + @Override + public RelNode visitLimit(Limit limit, ConversionContext context) throws InvalidRelException{ + return DrillLimitRel.convert(limit, context); + } + + @Override + public RelNode visitUnion(Union union, ConversionContext context) throws InvalidRelException{ + return DrillUnionRel.convert(union, context); + } + + @Override + public RelNode visitGroupingAggregate(GroupingAggregate groupBy, ConversionContext context) + throws InvalidRelException { + return DrillAggregateRel.convert(groupBy, context); + } + + } + + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index 991ade6fe..ae21a3cde 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -36,6 +36,15 @@ public class BatchSchema implements Iterable<MaterializedField> { return new SchemaBuilder(); } + public int getFieldCount(){ + return fields.size(); + } + + public MaterializedField getColumn(int index){ + if(index < 0 || index >= fields.size()) return null; + return fields.get(index); + } + @Override public Iterator<MaterializedField> iterator() { return fields.iterator(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 8d952e32f..6e764a886 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -138,17 +138,18 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess @Override - @SuppressWarnings("unchecked") public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { VectorWrapper<?> va = wrappers.get(fieldId); - assert va != null; - if (va.getVectorClass() != clazz) { + if(va!= null && clazz == null){ + return (VectorWrapper<?>) va; + } + if (va != null && va.getVectorClass() != clazz) { logger.warn(String.format( "Failure while reading vector. Expected vector class of %s but was holding vector class %s.", clazz.getCanonicalName(), va.getVectorClass().getCanonicalName())); return null; } - return (VectorWrapper) va; + return (VectorWrapper<?>) va; } public BatchSchema getSchema() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index 4a43a9948..1d9fecfd6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -104,7 +104,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection } public <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType, - SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException { + SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { return super.send(connection, rpcType, protobufBody, clazz, dataBodies); } @@ -201,6 +201,10 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection } + public void setAutoRead(boolean enableAutoRead){ + connection.setAutoRead(enableAutoRead); + } + public void close() { logger.debug("Closing client"); connection.getChannel().close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java index a0b5cfd65..c665949a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java @@ -25,6 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; import com.google.protobuf.Internal.EnumLite; import com.google.protobuf.MessageLite; @@ -48,10 +49,10 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE @Override protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { - return handle(rpcType, pBody, dBody); + return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody); } - protected abstract Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ; + protected abstract Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ; @Override public ServerConnection initRemoteConnection(Channel channel) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java index 0eaade8f8..a19f8d833 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java @@ -22,8 +22,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; -public abstract class RemoteConnection{ +public abstract class RemoteConnection implements ConnectionThrottle{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class); private final Channel channel; private final WriteManager writeManager; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java new file mode 100644 index 000000000..e2ffcc0bc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/ConnectionThrottle.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.user; + +public interface ConnectionThrottle { + public void setAutoRead(boolean enableAutoRead); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index 5b4a504da..5345b31a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -51,7 +51,7 @@ public class QueryResultHandler { return new SubmissionListener(listener); } - public void batchArrived(ByteBuf pBody, ByteBuf dBody) throws RpcException { + public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf dBody) throws RpcException { final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER); final QueryResultBatch batch = new QueryResultBatch(result, dBody); UserResultsListener l = resultsListener.get(result.getQueryId()); @@ -72,7 +72,7 @@ public class QueryResultHandler { l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList())); resultsListener.remove(result.getQueryId(), l); }else{ - l.resultArrived(batch); + l.resultArrived(batch, throttle); } if ( @@ -100,13 +100,13 @@ public class QueryResultHandler { private volatile boolean finished = false; private volatile RpcException ex; private volatile UserResultsListener output; - + private volatile ConnectionThrottle throttle; public boolean transferTo(UserResultsListener l) { synchronized (this) { output = l; boolean last = false; for (QueryResultBatch r : results) { - l.resultArrived(r); + l.resultArrived(r, throttle); last = r.getHeader().getIsLastChunk(); } if(ex != null){ @@ -119,14 +119,15 @@ public class QueryResultHandler { @Override - public void resultArrived(QueryResultBatch result) { + public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { + this.throttle = throttle; if(result.getHeader().getIsLastChunk()) finished = true; synchronized (this) { if (output == null) { this.results.add(result); } else { - output.resultArrived(result); + output.resultArrived(result, throttle); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index f28ff4b81..8b1bdecd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -70,10 +70,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType)); } - protected Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { + protected Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { switch (rpcType) { case RpcType.QUERY_RESULT_VALUE: - queryResultHandler.batchArrived(pBody, dBody); + queryResultHandler.batchArrived(throttle, pBody, dBody); return new Response(RpcType.ACK, Ack.getDefaultInstance()); default: throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java index 3bcd0cf4b..71f59948e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java @@ -24,6 +24,6 @@ public interface UserResultsListener { public abstract void queryIdArrived(QueryId queryId); public abstract void submissionFailed(RpcException ex); - public abstract void resultArrived(QueryResultBatch result); + public abstract void resultArrived(QueryResultBatch result, ConnectionThrottle throttle); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 0f45dbee7..aba9ab702 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -94,7 +94,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec } default: - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 39821e39f..60a7d5c83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -23,12 +23,16 @@ import java.util.Collection; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FunctionRegistry; import org.apache.drill.common.logical.StorageEngineConfig; import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.logical.StorageEngines; +import org.apache.drill.exec.planner.sql.DrillSchemaFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.WorkEventBus; @@ -37,7 +41,9 @@ import org.apache.drill.exec.store.StorageEngine; import org.apache.drill.exec.store.StorageEngineRegistry; import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import com.google.common.io.Resources; public class DrillbitContext { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class); @@ -53,6 +59,9 @@ public class DrillbitContext { private final OperatorCreatorRegistry operatorCreatorRegistry; private final Controller controller; private final WorkEventBus workBus; + private final FunctionImplementationRegistry functionRegistry; + private final FunctionRegistry functionRegistryX; + private final DrillSchemaFactory factory; public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus) { super(); @@ -70,8 +79,34 @@ public class DrillbitContext { this.storageEngineRegistry = new StorageEngineRegistry(this); this.reader = new PhysicalPlanReader(context.getConfig(), context.getConfig().getMapper(), endpoint, storageEngineRegistry); this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig()); + this.functionRegistry = new FunctionImplementationRegistry(context.getConfig()); + + DrillSchemaFactory factory = null; + try{ + String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8); + StorageEngines engines = context.getConfig().getMapper().readValue(enginesData, StorageEngines.class); + factory = new DrillSchemaFactory(engines, context.getConfig()); + }catch(Exception e){ + logger.error("Failure reading storage engines data. Creating empty list of schemas.", e); + factory = DrillSchemaFactory.createEmpty(); + } + this.factory = factory; + this.functionRegistryX = new FunctionRegistry(context.getConfig()); + } + public DrillSchemaFactory getSchemaFactory(){ + return factory; + } + + public FunctionRegistry getFunctionRegistry(){ + return functionRegistryX; + } + + public FunctionImplementationRegistry getFunctionImplementationRegistry() { + return functionRegistry; + } + public WorkEventBus getWorkBus(){ return workBus; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 1756d9673..cb05273b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -116,7 +116,7 @@ public class ParquetGroupScan extends AbstractGroupScan { calculateEndpointBytes(); } - public ParquetGroupScan(ArrayList<ReadEntryWithPath> entries, + public ParquetGroupScan(List<ReadEntryWithPath> entries, ParquetStorageEngine storageEngine, FieldReference ref) throws IOException { this.storageEngine = storageEngine; this.engineConfig = storageEngine.getEngineConfig(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java index 86be49e32..c17a9e356 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetSchemaProvider.java @@ -52,7 +52,7 @@ public class ParquetSchemaProvider implements SchemaProvider{ @Override public Object getSelectionBaseOnName(String tableName) { try{ -// if(!fs.exists(new Path(tableName))) return null; + if(!fs.exists(new Path(tableName))) return null; ReadEntryWithPath re = new ReadEntryWithPath(tableName); return Lists.newArrayList(re); }catch(Exception e){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java index e04d3e969..45b9cc1b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java @@ -99,7 +99,7 @@ public class ParquetStorageEngine extends AbstractStorageEngine{ ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(), new TypeReference<ArrayList<ReadEntryWithPath>>() {}); - return new ParquetGroupScan(readEntries, this, scan.getOutputReference()); + return new ParquetGroupScan( Lists.newArrayList(readEntries), this, scan.getOutputReference()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index cbc8b86c7..4f8cd3373 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -167,6 +167,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe return ((data.getByte((int) Math.floor(index / 8)) & (int) Math.pow(2, (index % 8))) == 0) ? 0 : 1; } + public boolean isNull(int index){ + return false; + } + @Override public final Object getObject(int index) { return new Boolean(get(index) != 0); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java index 4e5364a91..a3d3a8a9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -125,6 +125,8 @@ public interface ValueVector extends Closeable { public abstract Object getObject(int index); public int getValueCount(); + + public boolean isNull(int index); public void reset(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java new file mode 100644 index 000000000..93089e7b6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; + +import org.apache.drill.common.types.TypeProtos.MajorType; + +abstract class AbstractSqlAccessor implements SqlAccessor { + + @Override + public abstract boolean isNull(int index); + + @Override + public BigDecimal getBigDecimal(int index) throws InvalidAccessException{ + throw new InvalidAccessException("BigDecimal"); + } + + @Override + public boolean getBoolean(int index) throws InvalidAccessException{ + throw new InvalidAccessException("boolean"); + } + + @Override + public byte getByte(int index) throws InvalidAccessException{ + throw new InvalidAccessException("byte"); + } + + @Override + public byte[] getBytes(int index) throws InvalidAccessException{ + throw new InvalidAccessException("byte[]"); + } + + @Override + public Date getDate(int index) throws InvalidAccessException{ + throw new InvalidAccessException("Date"); + } + + @Override + public double getDouble(int index) throws InvalidAccessException{ + throw new InvalidAccessException("double"); + } + + @Override + public float getFloat(int index) throws InvalidAccessException{ + throw new InvalidAccessException("float"); + } + + @Override + public int getInt(int index) throws InvalidAccessException{ + throw new InvalidAccessException("int"); + } + + @Override + public long getLong(int index) throws InvalidAccessException{ + throw new InvalidAccessException("long"); + } + + @Override + public short getShort(int index) throws InvalidAccessException{ + throw new InvalidAccessException("short"); + } + + @Override + public InputStream getStream(int index) throws InvalidAccessException{ + throw new InvalidAccessException("InputStream"); + } + + @Override + public char getChar(int index) throws InvalidAccessException{ + throw new InvalidAccessException("Char"); + } + + @Override + public Reader getReader(int index) throws InvalidAccessException{ + throw new InvalidAccessException("Reader"); + } + + @Override + public String getString(int index) throws InvalidAccessException{ + throw new InvalidAccessException("String"); + } + + @Override + public Time getTime(int index) throws InvalidAccessException{ + throw new InvalidAccessException("Time"); + } + + @Override + public Timestamp getTimestamp(int index) throws InvalidAccessException{ + throw new InvalidAccessException("Timestamp"); + } + + abstract MajorType getType(); + + + public class InvalidAccessException extends SQLException{ + public InvalidAccessException(String name){ + super(String.format("Requesting class of type %s for an object of type %s:%s is not allowed.", name, getType().getMinorType().name(), getType().getMode().name())); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java new file mode 100644 index 000000000..56113540f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +import org.apache.drill.exec.vector.accessor.AbstractSqlAccessor.InvalidAccessException; + +public interface SqlAccessor { + + public abstract boolean isNull(int index); + + public abstract BigDecimal getBigDecimal(int index) throws InvalidAccessException; + + public abstract boolean getBoolean(int index) throws InvalidAccessException; + + public abstract byte getByte(int index) throws InvalidAccessException; + + public abstract byte[] getBytes(int index) throws InvalidAccessException; + + public abstract Date getDate(int index) throws InvalidAccessException; + + public abstract double getDouble(int index) throws InvalidAccessException; + + public abstract float getFloat(int index) throws InvalidAccessException; + + public abstract char getChar(int index) throws InvalidAccessException; + + public abstract int getInt(int index) throws InvalidAccessException; + + public abstract long getLong(int index) throws InvalidAccessException; + + public abstract short getShort(int index) throws InvalidAccessException; + + public abstract InputStream getStream(int index) throws InvalidAccessException; + + public abstract Reader getReader(int index) throws InvalidAccessException; + + public abstract String getString(int index) throws InvalidAccessException; + + public abstract Time getTime(int index) throws InvalidAccessException; + + public abstract Timestamp getTimestamp(int index) throws InvalidAccessException; + + public abstract Object getObject(int index) throws InvalidAccessException; + +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 29f011f62..b6d441cff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -119,6 +119,7 @@ public class WorkManager implements Closeable{ public class WorkerBee{ public void addFragmentRunner(FragmentExecutor runner){ + logger.debug("Adding pending task {}", runner); pendingTasks.add(runner); } @@ -167,7 +168,7 @@ public class WorkManager implements Closeable{ try { while(true){ // logger.debug("Polling for pending work tasks."); - Runnable r = pendingTasks.poll(10, TimeUnit.SECONDS); + Runnable r = pendingTasks.take(); if(r != null){ logger.debug("Starting pending task {}", r); executor.execute(r); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index e4f8e7b9c..329815d20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,43 +17,58 @@ */ package org.apache.drill.exec.work.foreman; +import io.netty.buffer.ByteBuf; + import java.io.Closeable; import java.io.IOException; import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.LogicalPlan; +import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.exception.OptimizerException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; import org.apache.drill.exec.planner.fragment.PlanningSet; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.StatsCollector; +import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.proto.UserProtos.QueryResult; import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RunQuery; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.util.AtomicState; +import org.apache.drill.exec.vector.VarCharVector; import org.apache.drill.exec.work.ErrorHelper; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; +import com.google.common.base.Charsets; import com.google.common.collect.Lists; /** @@ -151,6 +166,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ case LOGICAL: parseAndRunLogicalPlan(queryRequest.getPlan()); + break; case PHYSICAL: parseAndRunPhysicalPlan(queryRequest.getPlan()); @@ -170,8 +186,18 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ try { LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json); + + if(logicalPlan.getProperties().resultMode == ResultMode.LOGICAL){ + fail("Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.", new Exception()); + } if(logger.isDebugEnabled()) logger.debug("Logical {}", logicalPlan.unparse(context.getConfig())); PhysicalPlan physicalPlan = convert(logicalPlan); + + if(logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL){ + returnPhysical(physicalPlan); + return; + } + if(logger.isDebugEnabled()) logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan)); runPhysicalPlan(physicalPlan); } catch (IOException e) { @@ -181,6 +207,74 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } + + private void returnLogical(LogicalPlan plan){ + String jsonPlan = plan.toJsonStringSafe(context.getConfig()); + sendSingleString("logical", jsonPlan); + } + + private void returnPhysical(PhysicalPlan plan){ + String jsonPlan = plan.unparse(context.getConfig().getMapper().writer()); + sendSingleString("physical", jsonPlan); + } + + private void sendSingleString(String columnName, String value){ + MaterializedField f = MaterializedField.create(new SchemaPath(columnName, ExpressionPosition.UNKNOWN), Types.required(MinorType.VARCHAR)); + VarCharVector vector = new VarCharVector(f, bee.getContext().getAllocator()); + byte[] bytes = value.getBytes(Charsets.UTF_8); + vector.allocateNew(bytes.length, 1); + vector.getMutator().set(0, bytes); + vector.getMutator().setValueCount(1); + QueryResult header = QueryResult.newBuilder() // + .setQueryId(context.getQueryId()) // + .setRowCount(1) // + .setDef(RecordBatchDef.newBuilder().addField(vector.getMetadata()).build()) // + .setIsLastChunk(false) // + .build(); + QueryWritableBatch b1 = new QueryWritableBatch(header, vector.getBuffers()); + vector.close(); + + QueryResult header2 = QueryResult.newBuilder() // + .setQueryId(context.getQueryId()) // + .setRowCount(0) // + .setDef(RecordBatchDef.getDefaultInstance()) // + .setIsLastChunk(true) // + .build(); + QueryWritableBatch b2 = new QueryWritableBatch(header2); + + SingleListener l = new SingleListener(); + this.initiatingClient.sendResult(l, b1); + this.initiatingClient.sendResult(l, b2); + l.acct.waitForSendComplete(); + + } + + + class SingleListener implements RpcOutcomeListener<Ack>{ + + final SendingAccountor acct; + + public SingleListener(){ + acct = new SendingAccountor(); + acct.increment(); + acct.increment(); + } + + @Override + public void failed(RpcException ex) { + acct.decrement(); + fail("Failure while sending single result.", ex); + } + + @Override + public void success(Ack value, ByteBuf buffer) { + acct.decrement(); + } + + } + + + private void parseAndRunPhysicalPlan(String json) { try { PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json); @@ -192,7 +286,11 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ private void runPhysicalPlan(PhysicalPlan plan) { + if(plan.getProperties().resultMode != ResultMode.EXEC){ + fail(String.format("Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception()); + } PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor(); Fragment rootFragment; try { @@ -218,18 +316,26 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ List<PlanFragment> intermediateFragments = Lists.newArrayList(); // store fragments in distributed grid. + logger.debug("Storing fragments"); for (PlanFragment f : work.getFragments()) { // store all fragments in grid since they are part of handshake. + context.getCache().storeFragment(f); if (f.getLeafFragment()) { leafFragments.add(f); } else { intermediateFragments.add(f); } + + } + logger.debug("Fragments stored."); + + logger.debug("Submitting fragments to run."); fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments); + logger.debug("Fragments running."); } catch (ExecutionSetupException | RpcException e) { @@ -238,11 +344,32 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } - private void runSQL(String json) { - throw new UnsupportedOperationException(); + private void runSQL(String sql) { + try{ + DrillSqlWorker sqlWorker = new DrillSqlWorker(context.getSchemaFactory(), context.getFunctionRegistry()); + LogicalPlan plan = sqlWorker.getPlan(sql); + + + if(plan.getProperties().resultMode == ResultMode.LOGICAL){ + returnLogical(plan); + return; + } + + PhysicalPlan physical = convert(plan); + + if(plan.getProperties().resultMode == ResultMode.PHYSICAL){ + returnPhysical(physical); + return; + } + + runPhysicalPlan(physical); + }catch(Exception e){ + fail("Failure while parsing sql.", e); + } } private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException { + if(logger.isDebugEnabled()) logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig())); return new BasicOptimizer(DrillConfig.create(), context).optimize(new BasicOptimizer.BasicOptimizationContext(), plan); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index e9302e1fc..eaf921dfc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -76,18 +76,25 @@ class QueryManager implements FragmentStatusListener{ } public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, UserClientConnection rootClient, List<PlanFragment> leafFragments, List<PlanFragment> intermediateFragments) throws ExecutionSetupException{ + logger.debug("Setting up fragment runs."); remainingFragmentCount.set(leafFragments.size()+1); queryId = rootFragment.getHandle().getQueryId(); workBus = bee.getContext().getWorkBus(); // set up the root fragment first so we'll have incoming buffers available. { - FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, new FunctionImplementationRegistry(bee.getContext().getConfig())); + logger.debug("Setting up root context."); + FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext().getFunctionImplementationRegistry()); + logger.debug("Setting up incoming buffers"); IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext); + logger.debug("Setting buffers on root context."); rootContext.setBuffers(buffers); + logger.debug("Generating Exec tree"); RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator); + logger.debug("Exec tree generated."); // add fragment to local node. map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true)); + logger.debug("Fragment added to local node."); rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment)); RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); @@ -99,6 +106,7 @@ class QueryManager implements FragmentStatusListener{ workBus.setRootFragmentManager(fragmentManager); } + } // keep track of intermediate fragments (not root or leaf) @@ -112,6 +120,7 @@ class QueryManager implements FragmentStatusListener{ sendRemoteFragment(f); } + logger.debug("Fragment runs setup is complete."); } private void sendRemoteFragment(PlanFragment fragment){ diff --git a/exec/java-exec/src/main/resources/storage-engines.json b/exec/java-exec/src/main/resources/storage-engines.json new file mode 100644 index 000000000..d1d0413a8 --- /dev/null +++ b/exec/java-exec/src/main/resources/storage-engines.json @@ -0,0 +1,29 @@ +{ + "storage":{ + "parquet-local" : + { + "type":"parquet", + "dfsName" : "file:///" + }, + "parquet-cp" : + { + "type":"parquet", + "dfsName" : "classpath:///" + }, + "jsonl" : + { + "type":"json", + "dfsName" : "file:///" + }, + "json-cp" : + { + "type":"json", + "dfsName" : "classpath:///" + }, + "parquet" : + { + "type":"parquet", + "dfsName" : "file:///" + } + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java index b4dc943ba..454b15f8f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java @@ -171,12 +171,7 @@ public class TestSimpleFragmentRun extends PopUnitTestBase { } ValueVector.Accessor accessor = v.getValueVector().getAccessor(); - - if (v.getField().getType().getMinorType() == TypeProtos.MinorType.VARCHAR) { - System.out.println(new String((byte[]) accessor.getObject(r), UTF_8)); - } else { - System.out.print(accessor.getObject(r)); - } + System.out.print(accessor.getObject(r)); } if (!first) System.out.println(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java index bffc42765..2be3e8d7c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java @@ -145,10 +145,10 @@ public class JSONRecordReaderTest { assertEquals(3, addFields.size()); assertField(addFields.get(0), 0, MinorType.INT, 123, "test"); assertField(addFields.get(1), 0, MinorType.BIT, true, "b"); - assertField(addFields.get(2), 0, MinorType.VARCHAR, "hi!".getBytes(UTF_8), "c"); + assertField(addFields.get(2), 0, MinorType.VARCHAR, "hi!", "c"); assertField(addFields.get(0), 1, MinorType.INT, 1234, "test"); assertField(addFields.get(1), 1, MinorType.BIT, false, "b"); - assertField(addFields.get(2), 1, MinorType.VARCHAR, "drill!".getBytes(UTF_8), "c"); + assertField(addFields.get(2), 1, MinorType.VARCHAR, "drill!", "c"); assertEquals(0, jr.next()); assertTrue(mutator.getRemovedFields().isEmpty()); @@ -178,19 +178,19 @@ public class JSONRecordReaderTest { assertField(addFields.get(1), 0, MinorType.INT, 1, "b"); assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 2.15, "c"); assertField(addFields.get(3), 0, MinorType.BIT, true, "bool"); - assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1".getBytes(UTF_8), "str1"); + assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1", "str1"); assertField(addFields.get(0), 1, MinorType.INT, 1234, "test"); assertField(addFields.get(1), 1, MinorType.INT, 3, "b"); assertField(addFields.get(3), 1, MinorType.BIT, false, "bool"); - assertField(addFields.get(4), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1"); + assertField(addFields.get(4), 1, MinorType.VARCHAR, "test2", "str1"); assertField(addFields.get(5), 1, MinorType.INT, 4, "d"); assertField(addFields.get(0), 2, MinorType.INT, 12345, "test"); assertField(addFields.get(2), 2, MinorType.FLOAT4, (float) 5.16, "c"); assertField(addFields.get(3), 2, MinorType.BIT, true, "bool"); assertField(addFields.get(5), 2, MinorType.INT, 6, "d"); - assertField(addFields.get(6), 2, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2"); + assertField(addFields.get(6), 2, MinorType.VARCHAR, "test3", "str2"); assertTrue(mutator.getRemovedFields().isEmpty()); assertEquals(0, jr.next()); } @@ -220,14 +220,14 @@ public class JSONRecordReaderTest { assertField(addFields.get(1), 0, MinorType.INT, 1, "b"); assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 2.15, "c"); assertField(addFields.get(3), 0, MinorType.BIT, true, "bool"); - assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1".getBytes(UTF_8), "str1"); + assertField(addFields.get(4), 0, MinorType.VARCHAR, "test1", "str1"); assertTrue(removedFields.isEmpty()); assertEquals(1, jr.next()); assertEquals(6, addFields.size()); assertField(addFields.get(0), 0, MinorType.INT, 1234, "test"); assertField(addFields.get(1), 0, MinorType.INT, 3, "b"); assertField(addFields.get(3), 0, MinorType.BIT, false, "bool"); - assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2".getBytes(UTF_8), "str1"); + assertField(addFields.get(4), 0, MinorType.VARCHAR, "test2", "str1"); assertField(addFields.get(5), 0, MinorType.INT, 4, "d"); assertEquals(1, removedFields.size()); assertEquals("c", removedFields.get(0).getName()); @@ -238,7 +238,7 @@ public class JSONRecordReaderTest { assertField(addFields.get(3), 0, MinorType.BIT, true, "bool"); assertField(addFields.get(5), 0, MinorType.INT, 6, "d"); assertField(addFields.get(2), 0, MinorType.FLOAT4, (float) 5.16, "c"); - assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3".getBytes(UTF_8), "str2"); + assertField(addFields.get(6), 0, MinorType.VARCHAR, "test3", "str2"); assertEquals(2, removedFields.size()); Iterables.find(removedFields, new Predicate<MaterializedField>() { @Override @@ -274,10 +274,10 @@ public class JSONRecordReaderTest { assertEquals(2, jr.next()); assertEquals(3, addFields.size()); assertField(addFields.get(0), 0, MinorType.INT, 123, "test"); - assertField(addFields.get(1), 0, MinorType.VARCHAR, "test".getBytes(UTF_8), "a.b"); + assertField(addFields.get(1), 0, MinorType.VARCHAR, "test", "a.b"); assertField(addFields.get(2), 0, MinorType.BIT, true, "a.a.d"); assertField(addFields.get(0), 1, MinorType.INT, 1234, "test"); - assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2".getBytes(UTF_8), "a.b"); + assertField(addFields.get(1), 1, MinorType.VARCHAR, "test2", "a.b"); assertField(addFields.get(2), 1, MinorType.BIT, false, "a.a.d"); assertEquals(0, jr.next()); @@ -308,7 +308,7 @@ public class JSONRecordReaderTest { assertField(addFields.get(3), 0, MinorType.INT, Arrays.asList(7, 8, 9), "test3.b"); assertField(addFields.get(4), 0, MinorType.INT, Arrays.asList(10, 11, 12), "test3.c.d"); assertField(addFields.get(5), 0, MinorType.FLOAT4, Arrays.<Float>asList((float) 1.1, (float) 1.2, (float) 1.3), "testFloat"); - assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello".getBytes(UTF_8), "drill".getBytes(UTF_8)), "testStr"); + assertField(addFields.get(6), 0, MinorType.VARCHAR, Arrays.asList("hello", "drill"), "testStr"); assertField(addFields.get(1), 1, MinorType.INT, Arrays.asList(1, 2), "test2"); assertField(addFields.get(2), 1, MinorType.INT, Arrays.asList(7, 7, 7, 8), "test3.a"); assertField(addFields.get(5), 1, MinorType.FLOAT4, Arrays.<Float>asList((float) 2.2, (float) 2.3,(float) 2.4), "testFloat"); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java index 733cb1dd3..8efc7622d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java @@ -17,10 +17,13 @@ */ package org.apache.drill.exec.store; -import com.beust.jcommander.internal.Lists; -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import com.google.common.util.concurrent.SettableFuture; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static parquet.column.Encoding.PLAIN; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.types.TypeProtos; @@ -34,6 +37,7 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.server.Drillbit; @@ -54,10 +58,10 @@ import parquet.hadoop.metadata.CompressionCodecName; import parquet.schema.MessageType; import parquet.schema.MessageTypeParser; -import java.util.*; - -import static org.junit.Assert.*; -import static parquet.column.Encoding.PLAIN; +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import com.google.common.util.concurrent.SettableFuture; public class ParquetRecordReaderTest { org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class); @@ -368,7 +372,7 @@ public class ParquetRecordReaderTest { } @Override - public void resultArrived(QueryResultBatch result) { + public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { logger.debug("result arrived in test batch listener."); if(result.getHeader().getIsLastChunk()){ future.set(null); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java index b765ed00d..9c4aeea14 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java @@ -17,9 +17,10 @@ */ package org.apache.drill.exec.store; -import com.google.common.base.Charsets; -import com.google.common.base.Stopwatch; -import com.google.common.io.Resources; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.client.DrillClient; @@ -28,6 +29,7 @@ import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.server.Drillbit; @@ -37,10 +39,9 @@ import org.junit.AfterClass; import org.junit.Ignore; import org.junit.Test; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Charsets; +import com.google.common.base.Stopwatch; +import com.google.common.io.Resources; public class TestParquetPhysicalPlan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class); @@ -93,7 +94,7 @@ public class TestParquetPhysicalPlan { } @Override - public void resultArrived(QueryResultBatch result) { + public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { int rows = result.getHeader().getRowCount(); System.out.println(String.format("Result batch arrived. Number of records: %d", rows)); count.addAndGet(rows); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index ab29a9f38..52430e132 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.server.Drillbit; @@ -139,7 +140,7 @@ public class ParquetRecordReaderTest { } @Override - public void resultArrived(QueryResultBatch result) { + public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { long columnValCounter = 0; int i = 0; FieldInfo currentField; |