diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java index 20b06c430..920f9b2bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.record.MajorTypeSerDe; +import org.apache.drill.exec.serialization.PathSerDe; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; @@ -44,43 +45,43 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.Path; public class PhysicalPlanReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class); private final ObjectReader physicalPlanReader; private final ObjectMapper mapper; private final ObjectReader operatorReader; private final ObjectReader logicalPlanReader; - public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint, - final StoragePluginRegistry pluginRegistry) { + public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, + final DrillbitEndpoint endpoint, final StoragePluginRegistry pluginRegistry) { ObjectMapper lpMapper = lpPersistance.getMapper(); // Endpoint serializer/deserializer. - SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") // - .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) // - .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) // + SimpleModule serDeModule = new SimpleModule("PhysicalOperatorModule") + .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) + .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) .addSerializer(MajorType.class, new MajorTypeSerDe.Se()) .addDeserializer(MajorType.class, new MajorTypeSerDe.De()) .addDeserializer(DynamicPojoRecordReader.class, - new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper))); + new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper))) + .addSerializer(Path.class, new PathSerDe.Se()); - lpMapper.registerModule(deserModule); + lpMapper.registerModule(serDeModule); Set<Class<? extends PhysicalOperator>> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult); - for (Class<? extends PhysicalOperator> subType : subTypes) { - lpMapper.registerSubtypes(subType); - } + subTypes.forEach(lpMapper::registerSubtypes); lpMapper.registerSubtypes(DynamicPojoRecordReader.class); - InjectableValues injectables = new InjectableValues.Std() // - .addValue(StoragePluginRegistry.class, pluginRegistry) // - .addValue(DrillbitEndpoint.class, endpoint); // + InjectableValues injectables = new InjectableValues.Std() + .addValue(StoragePluginRegistry.class, pluginRegistry) + .addValue(DrillbitEndpoint.class, endpoint); this.mapper = lpMapper; - this.physicalPlanReader = mapper.reader(PhysicalPlan.class).with(injectables); - this.operatorReader = mapper.reader(PhysicalOperator.class).with(injectables); - this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables); + this.physicalPlanReader = mapper.readerFor(PhysicalPlan.class).with(injectables); + this.operatorReader = mapper.readerFor(PhysicalOperator.class).with(injectables); + this.logicalPlanReader = mapper.readerFor(LogicalPlan.class).with(injectables); } public String writeJson(OptionList list) throws JsonProcessingException{ @@ -91,33 +92,35 @@ public class PhysicalPlanReader { return mapper.writeValueAsString(op); } - public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException { + public PhysicalPlan readPhysicalPlan(String json) throws IOException { logger.debug("Reading physical plan {}", json); return physicalPlanReader.readValue(json); } - public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException { + public FragmentRoot readFragmentRoot(String json) throws IOException { logger.debug("Attempting to read {}", json); PhysicalOperator op = operatorReader.readValue(json); - if(op instanceof FragmentRoot){ + if (op instanceof FragmentRoot) { return (FragmentRoot) op; - }else{ - throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator. The operator was %s.", op.getClass().getCanonicalName())); + } else { + throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot " + + "as its root operator. The operator was %s.", op.getClass().getCanonicalName())); } } @VisibleForTesting - public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException { + public FragmentLeaf readFragmentLeaf(String json) throws IOException { logger.debug("Attempting to read {}", json); PhysicalOperator op = operatorReader.readValue(json); if (op instanceof FragmentLeaf){ return (FragmentLeaf) op; } else { - throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName())); + throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. " + + "The operator was %s.", op.getClass().getCanonicalName())); } } - public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{ + public LogicalPlan readLogicalPlan(String json) throws IOException{ logger.debug("Reading logical plan {}", json); return logicalPlanReader.readValue(json); } |