From a44cb9170c660cd9ee66750b0917d562942df1fb Mon Sep 17 00:00:00 2001 From: Mehant Baid Date: Wed, 17 Sep 2014 16:50:42 -0700 Subject: DRILL-1432: Propagate user credentials from client to the fragments --- .../org/apache/drill/exec/client/DrillClient.java | 24 ++++++++++++++++++++-- .../org/apache/drill/exec/ops/FragmentContext.java | 7 +++++++ .../exec/planner/fragment/SimpleParallelizer.java | 9 +++++--- .../org/apache/drill/exec/rpc/user/UserClient.java | 6 ++++-- .../apache/drill/exec/rpc/user/UserSession.java | 4 ++++ .../apache/drill/exec/work/foreman/Foreman.java | 2 +- .../apache/drill/exec/pop/TestFragmentChecker.java | 5 ++++- 7 files changed, 48 insertions(+), 9 deletions(-) (limited to 'exec') diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 74cc6a6e7..2c9d2fef5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.proto.UserProtos; @@ -199,7 +200,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private void connect(DrillbitEndpoint endpoint) throws RpcException { FutureHandler f = new FutureHandler(); try { - client.connect(f, endpoint, props); + client.connect(f, endpoint, props, getUserCredentials()); f.checkedGet(); } catch (InterruptedException e) { throw new RpcException(e); @@ -247,6 +248,26 @@ public class DrillClient implements Closeable, ConnectionThrottle{ return listener.getResults(); } + + /* + * Helper method to generate the UserCredentials message from the properties. + */ + private UserBitShared.UserCredentials getUserCredentials() { + // If username is not propagated as one of the properties + String userName = "anonymous"; + + if (props != null) { + for (Property property: props.getPropertiesList()) { + if (property.getKey().equalsIgnoreCase("user")) { + userName = property.getValue(); + break; + } + } + } + + return UserBitShared.UserCredentials.newBuilder().setUserName(userName).build(); + } + public DrillRpcFuture cancelQuery(QueryId id) { logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id)); return client.send(RpcType.CANCEL_QUERY, id, Ack.class); @@ -350,5 +371,4 @@ public class DrillClient implements Closeable, ConnectionThrottle{ } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 04e19371d..0564c1ae9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; @@ -75,6 +76,7 @@ public class FragmentContext implements Closeable { private final long queryStartTime; private final int rootFragmentTimeZone; private final OptionManager fragmentOptions; + private final UserCredentials credentials; private LongObjectOpenHashMap managedBuffers = new LongObjectOpenHashMap<>(); private volatile Throwable failureCause; @@ -91,6 +93,7 @@ public class FragmentContext implements Closeable { this.funcRegistry = funcRegistry; this.queryStartTime = fragment.getQueryStartTime(); this.rootFragmentTimeZone = fragment.getTimeZone(); + this.credentials = fragment.getCredentials(); logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); logger.debug("Fragment max allocation: {}", fragment.getMemMax()); try { @@ -259,6 +262,10 @@ public class FragmentContext implements Closeable { return funcRegistry; } + public UserCredentials getCredentials() { + return credentials; + } + public QueryClassLoader getClassLoader() { return loader; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index bf4dae704..cd37c1792 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -34,7 +34,9 @@ import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.work.QueryWorkUnit; @@ -89,13 +91,13 @@ public class SimpleParallelizer { * @throws ExecutionSetupException */ public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection activeEndpoints, - PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) throws ExecutionSetupException { + PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, UserSession session) throws ExecutionSetupException { assignEndpoints(activeEndpoints, planningSet); - return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet); + return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session); } private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, PhysicalPlanReader reader, Fragment rootNode, - PlanningSet planningSet) throws ExecutionSetupException { + PlanningSet planningSet, UserSession session) throws ExecutionSetupException { List fragments = Lists.newArrayList(); @@ -157,6 +159,7 @@ public class SimpleParallelizer { .setMemInitial(wrapper.getInitialAllocation())// .setMemMax(wrapper.getMaxAllocation()) .setOptionsJson(optionsData) + .setCredentials(session.getCredentials()) .build(); if (isRootNode) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 4df6bfea2..4e7fc925e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -23,6 +23,7 @@ import io.netty.channel.EventLoopGroup; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; @@ -55,12 +56,13 @@ public class UserClient extends BasicClientWithConnection handler, DrillbitEndpoint endpoint, UserProperties props) + public void connect(RpcConnectionHandler handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials) throws RpcException, InterruptedException { UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) .setSupportListening(true) - .setSupportComplexTypes(supportComplexTypes); + .setSupportComplexTypes(supportComplexTypes) + .setCredentials(credentials); if (props != null) { hsBuilder.setProperties(props); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index d196743eb..efb0cdf93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -99,6 +99,10 @@ public class UserSession { return user; } + public UserCredentials getCredentials() { + return credentials; + } + /** * Update the schema path for the session. * @param fullPath The desired path to set to. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 1e5d8b8f4..0a34a2201 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -373,7 +373,7 @@ public class Foreman implements Runnable, Closeable, Comparable{ } QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(), - queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet); + queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, initiatingClient.getSession()); this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager); List leafFragments = Lists.newArrayList(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index 58ddd0663..6349b767c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -26,7 +26,9 @@ import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.StatsCollector; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.work.QueryWorkUnit; import org.junit.Test; @@ -59,7 +61,8 @@ public class TestFragmentChecker extends PopUnitTestBase{ endpoints.add(b1); } - QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet); + QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, + UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build()); System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId())); System.out.print(qwu.getRootFragment().getFragmentJson()); -- cgit v1.2.3