aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorMehant Baid <mehantr@gmail.com>2014-09-17 16:50:42 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-09-29 18:21:44 -0700
commita44cb9170c660cd9ee66750b0917d562942df1fb (patch)
tree6d3ec502f7ed38055dc93d6abec1fa2f0837bb8d /exec
parent1c40b5e745ab20da6725568c9e4790f34ae73c8f (diff)
DRILL-1432: Propagate user credentials from client to the fragments
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java5
7 files changed, 48 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 74cc6a6e7..2c9d2fef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
@@ -199,7 +200,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private void connect(DrillbitEndpoint endpoint) throws RpcException {
FutureHandler f = new FutureHandler();
try {
- client.connect(f, endpoint, props);
+ client.connect(f, endpoint, props, getUserCredentials());
f.checkedGet();
} catch (InterruptedException e) {
throw new RpcException(e);
@@ -247,6 +248,26 @@ public class DrillClient implements Closeable, ConnectionThrottle{
return listener.getResults();
}
+
+ /*
+ * Helper method to generate the UserCredentials message from the properties.
+ */
+ private UserBitShared.UserCredentials getUserCredentials() {
+ // If username is not propagated as one of the properties
+ String userName = "anonymous";
+
+ if (props != null) {
+ for (Property property: props.getPropertiesList()) {
+ if (property.getKey().equalsIgnoreCase("user")) {
+ userName = property.getValue();
+ break;
+ }
+ }
+ }
+
+ return UserBitShared.UserCredentials.newBuilder().setUserName(userName).build();
+ }
+
public DrillRpcFuture<Ack> cancelQuery(QueryId id) {
logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
@@ -350,5 +371,4 @@ public class DrillClient implements Closeable, ConnectionThrottle{
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 04e19371d..0564c1ae9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -75,6 +76,7 @@ public class FragmentContext implements Closeable {
private final long queryStartTime;
private final int rootFragmentTimeZone;
private final OptionManager fragmentOptions;
+ private final UserCredentials credentials;
private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
private volatile Throwable failureCause;
@@ -91,6 +93,7 @@ public class FragmentContext implements Closeable {
this.funcRegistry = funcRegistry;
this.queryStartTime = fragment.getQueryStartTime();
this.rootFragmentTimeZone = fragment.getTimeZone();
+ this.credentials = fragment.getCredentials();
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
try {
@@ -259,6 +262,10 @@ public class FragmentContext implements Closeable {
return funcRegistry;
}
+ public UserCredentials getCredentials() {
+ return credentials;
+ }
+
public QueryClassLoader getClassLoader() {
return loader;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index bf4dae704..cd37c1792 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -34,7 +34,9 @@ import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.QueryWorkUnit;
@@ -89,13 +91,13 @@ public class SimpleParallelizer {
* @throws ExecutionSetupException
*/
public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints,
- PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) throws ExecutionSetupException {
+ PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, UserSession session) throws ExecutionSetupException {
assignEndpoints(activeEndpoints, planningSet);
- return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet);
+ return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session);
}
private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode,
- PlanningSet planningSet) throws ExecutionSetupException {
+ PlanningSet planningSet, UserSession session) throws ExecutionSetupException {
List<PlanFragment> fragments = Lists.newArrayList();
@@ -157,6 +159,7 @@ public class SimpleParallelizer {
.setMemInitial(wrapper.getInitialAllocation())//
.setMemMax(wrapper.getMaxAllocation())
.setOptionsJson(optionsData)
+ .setCredentials(session.getCredentials())
.build();
if (isRootNode) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 4df6bfea2..4e7fc925e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -23,6 +23,7 @@ import io.netty.channel.EventLoopGroup;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
@@ -55,12 +56,13 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
- public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props)
+ public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
throws RpcException, InterruptedException {
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
- .setSupportComplexTypes(supportComplexTypes);
+ .setSupportComplexTypes(supportComplexTypes)
+ .setCredentials(credentials);
if (props != null) {
hsBuilder.setProperties(props);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index d196743eb..efb0cdf93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -99,6 +99,10 @@ public class UserSession {
return user;
}
+ public UserCredentials getCredentials() {
+ return credentials;
+ }
+
/**
* Update the schema path for the session.
* @param fullPath The desired path to set to.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 1e5d8b8f4..0a34a2201 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -373,7 +373,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
}
QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
- queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet);
+ queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, initiatingClient.getSession());
this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager);
List<PlanFragment> leafFragments = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 58ddd0663..6349b767c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -26,7 +26,9 @@ import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.junit.Test;
@@ -59,7 +61,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
endpoints.add(b1);
}
- QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet);
+ QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet,
+ UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build());
System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
System.out.print(qwu.getRootFragment().getFragmentJson());