aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy
diff options
context:
space:
mode:
authorPaul Rogers <progers@cloudera.com>2019-02-23 17:48:21 -0800
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2019-03-11 11:48:37 +0200
commitd585452b52e94a91ae76a24550c5c476847a9cba (patch)
tree4e91dc0d55bfb5efb93348a323645d5d4ebced40 /exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy
parent4a79e2a52ad2dac64ba001645da1442f0b06fd62 (diff)
DRILL-6952: Host compliant text reader on the row set framework
The result set loader allows controlling batch sizes. The new scan framework built on top of that framework handles projection, implicit columns, null columns and more. This commit converts the "new" ("compliant") text reader to use the new framework. Options select the use of the V2 ("new") or V3 (row-set based) versions. Unit tests demonstrate V3 functionality. closes #1683
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java442
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java44
3 files changed, 403 insertions, 135 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d76c6489e..dc1f053a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.drill.shaded.guava.com.google.common.base.Functions;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -39,10 +40,16 @@ import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
+import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.impl.scan.file.BaseFileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
+import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
@@ -55,67 +62,335 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+/**
+ * Base class for various file readers.
+ * <p>
+ * This version provides a bridge between the legacy {@link RecordReader}-style
+ * readers and the newer {@link FileBatchReader} style. Over time, split the
+ * class, or provide a cleaner way to handle the differences.
+ *
+ * @param <T> the format plugin config for this reader
+ */
+
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
- @SuppressWarnings("unused")
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+ /**
+ * Defines the static, programmer-defined options for this plugin. These
+ * options are attributes of how the plugin works. The plugin config,
+ * defined in the class definition, provides user-defined options that can
+ * vary across uses of the plugin.
+ */
+
+ public static class EasyFormatConfig {
+ public BasicFormatMatcher matcher;
+ public boolean readable = true;
+ public boolean writable;
+ public boolean blockSplittable;
+ public boolean compressible;
+ public Configuration fsConf;
+ public List<String> extensions;
+ public String defaultName;
+
+ // Config options that, prior to Drill 1.15, required the plugin to
+ // override methods. Moving forward, plugins should be migrated to
+ // use this simpler form. New plugins should use these options
+ // instead of overriding methods.
+
+ public boolean supportsProjectPushdown;
+ public boolean supportsAutoPartitioning;
+ public int readerOperatorType = -1;
+ public int writerOperatorType = -1;
+ }
+
+ /**
+ * Creates the scan batch to use with the plugin. Drill supports the "classic"
+ * style of scan batch and readers, along with the newer size-aware,
+ * component-based version. The implementation of this class assembles the
+ * readers and scan batch operator as needed for each version.
+ */
+
+ public interface ScanBatchCreator {
+ CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan)
+ throws ExecutionSetupException;
+ }
+
+ /**
+ * Use the original scanner based on the {@link RecordReader} interface.
+ * Requires that the storage plugin roll its own solutions for null columns.
+ * Is not able to limit vector or batch sizes. Retained or backward
+ * compatibility with "classic" format plugins which have not yet been
+ * upgraded to use the new framework.
+ */
+
+ public static class ClassicScanBatchCreator implements ScanBatchCreator {
+
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ClassicScanBatchCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+ final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
+
+ if (! columnExplorer.isStarQuery()) {
+ scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
+ columnExplorer.getTableColumns(), scan.getSelectionRoot(), scan.getPartitionDepth());
+ scan.setOperatorId(scan.getOperatorId());
+ }
+
+ final OperatorContext oContext = context.newOperatorContext(scan);
+ final DrillFileSystem dfs;
+ try {
+ dfs = oContext.newFileSystem(plugin.easyConfig().fsConf);
+ } catch (final IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
+ }
+
+ final List<RecordReader> readers = new LinkedList<>();
+ final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+ Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+ final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
+ for (final FileWork work : scan.getWorkUnits()) {
+ final RecordReader recordReader = getRecordReader(
+ plugin, context, dfs, work, scan.getColumns(), scan.getUserName());
+ readers.add(recordReader);
+ final List<String> partitionValues = ColumnExplorer.listPartitionValues(
+ work.getPath(), scan.getSelectionRoot(), false);
+ final Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(
+ work.getPath(), partitionValues, supportsFileImplicitColumns);
+ implicitColumns.add(implicitValues);
+ if (implicitValues.size() > mapWithMaxColumns.size()) {
+ mapWithMaxColumns = implicitValues;
+ }
+ }
+
+ // all readers should have the same number of implicit columns, add missing ones with value null
+ final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
+ for (final Map<String, String> map : implicitColumns) {
+ map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+ }
+
+ return new ScanBatch(context, oContext, readers, implicitColumns);
+ }
+
+ /**
+ * Create a record reader given a file system, a file description and other
+ * information. For backward compatibility, calls the plugin method by
+ * default.
+ *
+ * @param plugin
+ * the plugin creating the scan
+ * @param context
+ * fragment context for the fragment running the scan
+ * @param dfs
+ * Drill's distributed file system facade
+ * @param fileWork
+ * description of the file to scan
+ * @param columns
+ * list of columns to project
+ * @param userName
+ * the name of the user performing the scan
+ * @return a scan operator
+ * @throws ExecutionSetupException
+ * if anything goes wrong
+ */
+
+ public RecordReader getRecordReader(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+ FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
+ }
+ }
+
+ /**
+ * Revised scanner based on the revised
+ * {@link ResultSetLoader} and {@link RowBatchReader} classes.
+ * Handles most projection tasks automatically. Able to limit
+ * vector and batch sizes. Use this for new format plugins.
+ */
+
+ public abstract static class ScanFrameworkCreator
+ implements ScanBatchCreator {
+
+ protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ /**
+ * Builds the revised {@link FileBatchReader}-based scan batch.
+ *
+ * @param context
+ * @param scan
+ * @return
+ * @throws ExecutionSetupException
+ */
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+
+ // Assemble the scan operator and its wrapper.
+
+ try {
+ final BaseFileScanFramework<?> framework = buildFramework(scan);
+ final Path selectionRoot = scan.getSelectionRoot();
+ if (selectionRoot != null) {
+ framework.setSelectionRoot(selectionRoot, scan.getPartitionDepth());
+ }
+ return new OperatorRecordBatch(
+ context, scan,
+ new ScanOperatorExec(
+ framework));
+ } catch (final UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (final Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ /**
+ * Create the plugin-specific framework that manages the scan. The framework
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles "early" or "late" schema readers. A typical
+ * framework builds on standardized frameworks for files in general or text
+ * files in particular.
+ *
+ * @param scan the physical operation definition for the scan operation. Contains
+ * one or more files to read. (The Easy format plugin works only for files.)
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+ protected abstract BaseFileScanFramework<?> buildFramework(
+ EasySubScan scan) throws ExecutionSetupException;
+ }
+
+ /**
+ * Generic framework creator for files that just use the basic file
+ * support: metadata, etc. Specialized use cases (special "columns"
+ * column, say) will require a specialized implementation.
+ */
+
+ public abstract static class FileScanFrameworkCreator extends ScanFrameworkCreator {
+
+ private final FileReaderFactory readerCreator;
+
+ public FileScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig> plugin,
+ FileReaderFactory readerCreator) {
+ super(plugin);
+ this.readerCreator = readerCreator;
+ }
+
+ @Override
+ protected FileScanFramework buildFramework(
+ EasySubScan scan) throws ExecutionSetupException {
- private final BasicFormatMatcher matcher;
+ final FileScanFramework framework = new FileScanFramework(
+ scan.getColumns(),
+ scan.getWorkUnits(),
+ plugin.easyConfig().fsConf,
+ readerCreator);
+ return framework;
+ }
+ }
+
+ private final String name;
+ private final EasyFormatConfig easyConfig;
private final DrillbitContext context;
- private final boolean readable;
- private final boolean writable;
- private final boolean blockSplittable;
- private final Configuration fsConf;
private final StoragePluginConfig storageConfig;
protected final T formatConfig;
- private final String name;
- private final boolean compressible;
+ /**
+ * Legacy constructor.
+ */
protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
- StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable, boolean blockSplittable,
- boolean compressible, List<String> extensions, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
- this.readable = readable;
- this.writable = writable;
+ StoragePluginConfig storageConfig, T formatConfig, boolean readable, boolean writable,
+ boolean blockSplittable,
+ boolean compressible, List<String> extensions, String defaultName) {
+ this.name = name == null ? defaultName : name;
+ easyConfig = new EasyFormatConfig();
+ easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions, compressible);
+ easyConfig.readable = readable;
+ easyConfig.writable = writable;
this.context = context;
- this.blockSplittable = blockSplittable;
- this.compressible = compressible;
- this.fsConf = fsConf;
+ easyConfig.blockSplittable = blockSplittable;
+ easyConfig.compressible = compressible;
+ easyConfig.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
- this.name = name == null ? defaultName : name;
}
- @Override
- public Configuration getFsConf() {
- return fsConf;
+ /**
+ * Revised constructor in which settings are gathered into a configuration object.
+ *
+ * @param name name of the plugin
+ * @param config configuration options for this plugin which determine
+ * developer-defined runtime behavior
+ * @param context the global server-wide drillbit context
+ * @param storageConfig the configuration for the storage plugin that owns this
+ * format plugin
+ * @param formatConfig the Jackson-serialized format configuration as created
+ * by the user in the Drill web console. Holds user-defined options.
+ */
+
+ protected EasyFormatPlugin(String name, EasyFormatConfig config, DrillbitContext context,
+ StoragePluginConfig storageConfig, T formatConfig) {
+ this.name = name;
+ this.easyConfig = config;
+ this.context = context;
+ this.storageConfig = storageConfig;
+ this.formatConfig = formatConfig;
+ if (easyConfig.matcher == null) {
+ easyConfig.matcher = new BasicFormatMatcher(this,
+ easyConfig.fsConf, easyConfig.extensions,
+ easyConfig.compressible);
+ }
}
@Override
- public DrillbitContext getContext() {
- return context;
- }
+ public Configuration getFsConf() { return easyConfig.fsConf; }
@Override
- public String getName() {
- return name;
- }
+ public DrillbitContext getContext() { return context; }
- public abstract boolean supportsPushDown();
+ public EasyFormatConfig easyConfig() { return easyConfig; }
+
+ @Override
+ public String getName() { return name; }
/**
- * Whether or not you can split the format based on blocks within file boundaries. If not, the simple format engine will
- * only split on file boundaries.
+ * Does this plugin support projection push down? That is, can the reader
+ * itself handle the tasks of projecting table columns, creating null
+ * columns for missing table columns, and so on?
*
- * @return True if splittable.
+ * @return <tt>true</tt> if the plugin supports projection push-down,
+ * <tt>false</tt> if Drill should do the task by adding a project operator
*/
- public boolean isBlockSplittable() {
- return blockSplittable;
- }
+
+ public boolean supportsPushDown() { return easyConfig.supportsProjectPushdown; }
+
+ /**
+ * Whether or not you can split the format based on blocks within file
+ * boundaries. If not, the simple format engine will only split on file
+ * boundaries.
+ *
+ * @return <code>true</code> if splittable.
+ */
+ public boolean isBlockSplittable() { return easyConfig.blockSplittable; }
/**
* Indicates whether or not this format could also be in a compression
@@ -125,52 +400,40 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*
* @return <code>true</code> if it is compressible
*/
- public boolean isCompressible() {
- return compressible;
- }
-
- public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
- List<SchemaPath> columns, String userName) throws ExecutionSetupException;
+ public boolean isCompressible() { return easyConfig.compressible; }
- CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
- final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns());
-
- if (!columnExplorer.isStarQuery()) {
- scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
- columnExplorer.getTableColumns(), scan.getSelectionRoot());
- scan.setOperatorId(scan.getOperatorId());
- }
+ /**
+ * Return a record reader for the specific file format, when using the original
+ * {@link ScanBatch} scanner.
+ * @param context fragment context
+ * @param dfs Drill file system
+ * @param fileWork metadata about the file to be scanned
+ * @param columns list of projected columns (or may just contain the wildcard)
+ * @param userName the name of the user running the query
+ * @return a record reader for this format
+ * @throws ExecutionSetupException for many reasons
+ */
- OperatorContext oContext = context.newOperatorContext(scan);
- final DrillFileSystem dfs;
- try {
- dfs = oContext.newFileSystem(fsConf);
- } catch (IOException e) {
- throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
- }
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ throw new ExecutionSetupException("Must implement getRecordReader() if using the legacy scanner.");
+ }
- List<RecordReader> readers = new LinkedList<>();
- List<Map<String, String>> implicitColumns = Lists.newArrayList();
- Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
- boolean supportsFileImplicitColumns = scan.getSelectionRoot() != null;
- for (FileWork work : scan.getWorkUnits()){
- RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName());
- readers.add(recordReader);
- List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false);
- Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns);
- implicitColumns.add(implicitValues);
- if (implicitValues.size() > mapWithMaxColumns.size()) {
- mapWithMaxColumns = implicitValues;
- }
- }
+ protected CloseableRecordBatch getReaderBatch(final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+ return scanBatchCreator(context.getOptions()).buildScan(context, scan);
+ }
- // all readers should have the same number of implicit columns, add missing ones with value null
- Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
- for (Map<String, String> map : implicitColumns) {
- map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
- }
+ /**
+ * Create the scan batch creator. Needed only when using the revised scan batch. In that
+ * case, override the <tt>readerIterator()</tt> method on the custom scan batch
+ * creator implementation.
+ *
+ * @return the strategy for creating the scan batch for this plugin
+ */
- return new ScanBatch(context, oContext, readers, implicitColumns);
+ protected ScanBatchCreator scanBatchCreator(OptionManager options) {
+ return new ClassicScanBatchCreator(this);
}
public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter writer) {
@@ -219,41 +482,28 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
@Override
- public T getConfig() {
- return formatConfig;
- }
+ public T getConfig() { return formatConfig; }
@Override
- public StoragePluginConfig getStorageConfig() {
- return storageConfig;
- }
+ public StoragePluginConfig getStorageConfig() { return storageConfig; }
@Override
- public boolean supportsRead() {
- return readable;
- }
+ public boolean supportsRead() { return easyConfig.readable; }
@Override
- public boolean supportsWrite() {
- return writable;
- }
+ public boolean supportsWrite() { return easyConfig.writable; }
@Override
- public boolean supportsAutoPartitioning() {
- return false;
- }
+ public boolean supportsAutoPartitioning() { return easyConfig.supportsAutoPartitioning; }
@Override
- public FormatMatcher getMatcher() {
- return matcher;
- }
+ public FormatMatcher getMatcher() { return easyConfig.matcher; }
@Override
public Set<StoragePluginOptimizerRule> getOptimizerRules() {
return ImmutableSet.of();
}
- public abstract int getReaderOperatorType();
- public abstract int getWriterOperatorType();
-
+ public int getReaderOperatorType() { return easyConfig.readerOperatorType; }
+ public int getWriterOperatorType() { return easyConfig.writerOperatorType; }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 4449ec054..6a6243cd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
@@ -57,9 +58,11 @@ import org.apache.hadoop.fs.Path;
public class EasyGroupScan extends AbstractFileGroupScan {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyGroupScan.class);
- private FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
+ private FileSelection selection;
+ private int partitionDepth;
private int maxWidth;
+ private int minWidth = 1;
private List<SchemaPath> columns;
private ListMultimap<Integer, CompleteFileWork> mappings;
@@ -104,9 +107,23 @@ public class EasyGroupScan extends AbstractFileGroupScan {
initFromSelection(selection, formatPlugin);
}
- @JsonIgnore
- public Iterable<CompleteFileWork> getWorkIterable() {
- return () -> Iterators.unmodifiableIterator(chunks.iterator());
+ public EasyGroupScan(
+ String userName,
+ FileSelection selection,
+ EasyFormatPlugin<?> formatPlugin,
+ List<SchemaPath> columns,
+ Path selectionRoot,
+ int minWidth
+ ) throws IOException{
+ this(userName, selection, formatPlugin, columns, selectionRoot);
+
+ // Set the minimum width of this reader. Primarily used for testing
+ // to force parallelism even for small test files.
+ // See ExecConstants.MIN_READER_WIDTH
+ this.minWidth = Math.max(1, Math.min(minWidth, maxWidth));
+
+ // Compute the maximum partition depth across all files.
+ partitionDepth = ColumnExplorer.getPartitionDepth(selection);
}
private EasyGroupScan(final EasyGroupScan that) {
@@ -118,17 +135,23 @@ public class EasyGroupScan extends AbstractFileGroupScan {
chunks = that.chunks;
endpointAffinities = that.endpointAffinities;
maxWidth = that.maxWidth;
+ minWidth = that.minWidth;
mappings = that.mappings;
+ partitionDepth = that.partitionDepth;
+ }
+
+ @JsonIgnore
+ public Iterable<CompleteFileWork> getWorkIterable() {
+ return () -> Iterators.unmodifiableIterator(chunks.iterator());
}
private void initFromSelection(FileSelection selection, EasyFormatPlugin<?> formatPlugin) throws IOException {
- @SuppressWarnings("resource")
final DrillFileSystem dfs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf());
this.selection = selection;
BlockMapBuilder b = new BlockMapBuilder(dfs, formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable());
- this.maxWidth = chunks.size();
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ chunks = b.generateFileWork(selection.getStatuses(dfs), formatPlugin.isBlockSplittable());
+ maxWidth = chunks.size();
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
public Path getSelectionRoot() {
@@ -136,11 +159,16 @@ public class EasyGroupScan extends AbstractFileGroupScan {
}
@Override
+ @JsonIgnore
+ public int getMinParallelizationWidth() {
+ return minWidth;
+ }
+
+ @Override
public int getMaxParallelizationWidth() {
return maxWidth;
}
-
@Override
public ScanStats getScanStats(final PlannerSettings settings) {
return formatPlugin.getScanStats(settings, this);
@@ -163,7 +191,6 @@ public class EasyGroupScan extends AbstractFileGroupScan {
return columns;
}
-
@JsonIgnore
public FileSelection getFileSelection() {
return selection;
@@ -180,7 +207,6 @@ public class EasyGroupScan extends AbstractFileGroupScan {
return new EasyGroupScan(this);
}
-
@Override
public List<EndpointAffinity> getOperatorAffinity() {
if (endpointAffinities == null) {
@@ -217,7 +243,8 @@ public class EasyGroupScan extends AbstractFileGroupScan {
Preconditions.checkArgument(!filesForMinor.isEmpty(),
String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
- EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+ EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin,
+ columns, selectionRoot, partitionDepth);
subScan.setOperatorId(this.getOperatorId());
return subScan;
}
@@ -275,5 +302,4 @@ public class EasyGroupScan extends AbstractFileGroupScan {
public boolean canPushdownProjects(List<SchemaPath> columns) {
return formatPlugin.supportsPushDown();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index fbb3f475c..c51c7ac0f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -42,7 +42,8 @@ public class EasySubScan extends AbstractSubScan{
private final List<FileWorkImpl> files;
private final EasyFormatPlugin<?> formatPlugin;
private final List<SchemaPath> columns;
- private Path selectionRoot;
+ private final Path selectionRoot;
+ private final int partitionDepth;
@JsonCreator
public EasySubScan(
@@ -52,7 +53,8 @@ public class EasySubScan extends AbstractSubScan{
@JsonProperty("format") FormatPluginConfig formatConfig,
@JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("selectionRoot") Path selectionRoot
+ @JsonProperty("selectionRoot") Path selectionRoot,
+ @JsonProperty("partitionDepth") int partitionDepth
) throws ExecutionSetupException {
super(userName);
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
@@ -60,50 +62,40 @@ public class EasySubScan extends AbstractSubScan{
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
+ this.partitionDepth = partitionDepth;
}
- public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
- Path selectionRoot){
+ public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin,
+ List<SchemaPath> columns, Path selectionRoot, int partitionDepth) {
super(userName);
this.formatPlugin = plugin;
this.files = files;
this.columns = columns;
this.selectionRoot = selectionRoot;
+ this.partitionDepth = partitionDepth;
}
@JsonProperty
- public Path getSelectionRoot() {
- return selectionRoot;
- }
+ public Path getSelectionRoot() { return selectionRoot; }
+
+ @JsonProperty
+ public int getPartitionDepth() { return partitionDepth; }
@JsonIgnore
- public EasyFormatPlugin<?> getFormatPlugin(){
- return formatPlugin;
- }
+ public EasyFormatPlugin<?> getFormatPlugin() { return formatPlugin; }
@JsonProperty("files")
- public List<FileWorkImpl> getWorkUnits() {
- return files;
- }
+ public List<FileWorkImpl> getWorkUnits() { return files; }
@JsonProperty("storage")
- public StoragePluginConfig getStorageConfig(){
- return formatPlugin.getStorageConfig();
- }
+ public StoragePluginConfig getStorageConfig() { return formatPlugin.getStorageConfig(); }
@JsonProperty("format")
- public FormatPluginConfig getFormatConfig(){
- return formatPlugin.getConfig();
- }
+ public FormatPluginConfig getFormatConfig() { return formatPlugin.getConfig(); }
@JsonProperty("columns")
- public List<SchemaPath> getColumns(){
- return columns;
- }
+ public List<SchemaPath> getColumns() { return columns; }
@Override
- public int getOperatorType() {
- return formatPlugin.getReaderOperatorType();
- }
-
+ public int getOperatorType() { return formatPlugin.getReaderOperatorType(); }
}