aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java53
1 files changed, 28 insertions, 25 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 20b06c430..920f9b2bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.MajorTypeSerDe;
+import org.apache.drill.exec.serialization.PathSerDe;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
@@ -44,43 +45,43 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
public class PhysicalPlanReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
private final ObjectReader physicalPlanReader;
private final ObjectMapper mapper;
private final ObjectReader operatorReader;
private final ObjectReader logicalPlanReader;
- public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint,
- final StoragePluginRegistry pluginRegistry) {
+ public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance,
+ final DrillbitEndpoint endpoint, final StoragePluginRegistry pluginRegistry) {
ObjectMapper lpMapper = lpPersistance.getMapper();
// Endpoint serializer/deserializer.
- SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
- .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) //
- .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
+ SimpleModule serDeModule = new SimpleModule("PhysicalOperatorModule")
+ .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se())
+ .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De())
.addSerializer(MajorType.class, new MajorTypeSerDe.Se())
.addDeserializer(MajorType.class, new MajorTypeSerDe.De())
.addDeserializer(DynamicPojoRecordReader.class,
- new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)));
+ new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)))
+ .addSerializer(Path.class, new PathSerDe.Se());
- lpMapper.registerModule(deserModule);
+ lpMapper.registerModule(serDeModule);
Set<Class<? extends PhysicalOperator>> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult);
- for (Class<? extends PhysicalOperator> subType : subTypes) {
- lpMapper.registerSubtypes(subType);
- }
+ subTypes.forEach(lpMapper::registerSubtypes);
lpMapper.registerSubtypes(DynamicPojoRecordReader.class);
- InjectableValues injectables = new InjectableValues.Std() //
- .addValue(StoragePluginRegistry.class, pluginRegistry) //
- .addValue(DrillbitEndpoint.class, endpoint); //
+ InjectableValues injectables = new InjectableValues.Std()
+ .addValue(StoragePluginRegistry.class, pluginRegistry)
+ .addValue(DrillbitEndpoint.class, endpoint);
this.mapper = lpMapper;
- this.physicalPlanReader = mapper.reader(PhysicalPlan.class).with(injectables);
- this.operatorReader = mapper.reader(PhysicalOperator.class).with(injectables);
- this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables);
+ this.physicalPlanReader = mapper.readerFor(PhysicalPlan.class).with(injectables);
+ this.operatorReader = mapper.readerFor(PhysicalOperator.class).with(injectables);
+ this.logicalPlanReader = mapper.readerFor(LogicalPlan.class).with(injectables);
}
public String writeJson(OptionList list) throws JsonProcessingException{
@@ -91,33 +92,35 @@ public class PhysicalPlanReader {
return mapper.writeValueAsString(op);
}
- public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException {
+ public PhysicalPlan readPhysicalPlan(String json) throws IOException {
logger.debug("Reading physical plan {}", json);
return physicalPlanReader.readValue(json);
}
- public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException {
+ public FragmentRoot readFragmentRoot(String json) throws IOException {
logger.debug("Attempting to read {}", json);
PhysicalOperator op = operatorReader.readValue(json);
- if(op instanceof FragmentRoot){
+ if (op instanceof FragmentRoot) {
return (FragmentRoot) op;
- }else{
- throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator. The operator was %s.", op.getClass().getCanonicalName()));
+ } else {
+ throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot " +
+ "as its root operator. The operator was %s.", op.getClass().getCanonicalName()));
}
}
@VisibleForTesting
- public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException {
+ public FragmentLeaf readFragmentLeaf(String json) throws IOException {
logger.debug("Attempting to read {}", json);
PhysicalOperator op = operatorReader.readValue(json);
if (op instanceof FragmentLeaf){
return (FragmentLeaf) op;
} else {
- throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName()));
+ throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. " +
+ "The operator was %s.", op.getClass().getCanonicalName()));
}
}
- public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{
+ public LogicalPlan readLogicalPlan(String json) throws IOException{
logger.debug("Reading logical plan {}", json);
return logicalPlanReader.readValue(json);
}