/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.drill.exec.physical.impl; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; /** * Record batch used for a particular scan. Operators against one or more */ public class ScanBatch implements CloseableRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScanBatch.class); /** Main collection of fields' value vectors. */ private final VectorContainer container = new VectorContainer(); private int recordCount; private final FragmentContext context; private final OperatorContext oContext; private Iterator readers; private RecordReader currentReader; private BatchSchema schema; private final Mutator mutator; private boolean done; private Iterator> implicitColumns; private Map implicitValues; private final BufferAllocator allocator; private final List> implicitColumnList; private String currentReaderClassName; private final RecordBatchStatsContext batchStatsContext; // Represents last outcome of next(). If an Exception is thrown // during the method's execution a value IterOutcome.STOP will be assigned. private IterOutcome lastOutcome; private List readerList; // needed for repeatable scanners private boolean isRepeatableScan; // needed for repeatable scanners /** * * @param context * @param oContext * @param readerList * @param implicitColumnList : either an empty list when all the readers do not have implicit * columns, or there is a one-to-one mapping between reader and implicitColumns. */ public ScanBatch(FragmentContext context, OperatorContext oContext, List readerList, List> implicitColumnList) { this.context = context; this.readers = readerList.iterator(); this.implicitColumns = implicitColumnList.iterator(); if (!readers.hasNext()) { throw UserException.internalError( new ExecutionSetupException("A scan batch must contain at least one reader.")) .build(logger); } this.oContext = oContext; allocator = oContext.getAllocator(); mutator = new Mutator(oContext, allocator, container); oContext.getStats().startProcessing(); try { if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) { Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, " + "or implicit columns are not same across all the record readers!"); throw UserException.internalError(ex) .addContext("Setup failed for", readerList.get(0).getClass().getSimpleName()) .build(logger); } this.implicitColumnList = implicitColumnList; addImplicitVectors(); currentReader = null; batchStatsContext = new RecordBatchStatsContext(context, oContext); } finally { oContext.getStats().stopProcessing(); } } public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, List readers) throws ExecutionSetupException { this(context, context.newOperatorContext(subScanConfig), readers, Collections.> emptyList()); } public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, List readerList, boolean isRepeatableScan) throws ExecutionSetupException { this(context, context.newOperatorContext(subScanConfig), readerList, Collections.> emptyList()); this.readerList = readerList; this.isRepeatableScan = isRepeatableScan; } @Override public FragmentContext getContext() { return context; } @Override public BatchSchema getSchema() { return schema; } @Override public int getRecordCount() { return recordCount; } @Override public void kill(boolean sendUpstream) { if (sendUpstream) { done = true; } else { releaseAssets(); } } /** * This method is to perform scan specific actions when the scan needs to clean/reset readers and return NONE status * @return NONE */ private IterOutcome cleanAndReturnNone() { if (isRepeatableScan) { readers = readerList.iterator(); return IterOutcome.NONE; } else { releaseAssets(); // All data has been read. Release resource. done = true; return IterOutcome.NONE; } } /** * When receive zero record from current reader, update reader accordingly, * and return the decision whether the iteration should continue * @return whether we could continue iteration * @throws Exception */ private boolean shouldContinueAfterNoRecords() throws Exception { logger.trace("scan got 0 record."); if (isRepeatableScan) { if (!currentReader.hasNext()) { currentReader = null; readers = readerList.iterator(); return false; } return true; } else { // Regular scan currentReader.close(); currentReader = null; return true; // In regular case, we always continue the iteration, if no more reader, we will break out at the head of loop } } private IterOutcome internalNext() throws Exception { while (true) { if (currentReader == null && !getNextReaderIfHas()) { logger.trace("currentReader is null"); return cleanAndReturnNone(); } injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class); currentReader.allocate(mutator.fieldVectorMap()); recordCount = currentReader.next(); logger.trace("currentReader.next return recordCount={}", recordCount); Preconditions.checkArgument(recordCount >= 0, "recordCount from RecordReader.next() should not be negative"); boolean isNewSchema = mutator.isNewSchema(); populateImplicitVectorsAndSetCount(); oContext.getStats().batchReceived(0, recordCount, isNewSchema); boolean toContinueIter = true; if (recordCount == 0) { // If we got 0 record, we may need to clean and exit, but we may need to return a new schema in below code block, // so we use toContinueIter to mark the decision whether we should continue the iteration toContinueIter = shouldContinueAfterNoRecords(); } if (isNewSchema) { // Even when recordCount = 0, we should return return OK_NEW_SCHEMA if current reader presents a new schema. // This could happen when data sources have a non-trivial schema with 0 row. container.buildSchema(SelectionVectorMode.NONE); schema = container.getSchema(); lastOutcome = IterOutcome.OK_NEW_SCHEMA; return lastOutcome; } // Handle case of same schema. if (recordCount == 0) { if (toContinueIter) { continue; // Skip to next loop iteration if reader returns 0 row and has same schema. } else { // Return NONE if recordCount == 0 && !isNewSchema lastOutcome = IterOutcome.NONE; return lastOutcome; } } else { // return OK if recordCount > 0 && ! isNewSchema lastOutcome = IterOutcome.OK; return lastOutcome; } } } @Override public IterOutcome next() { if (done) { lastOutcome = IterOutcome.NONE; return lastOutcome; } oContext.getStats().startProcessing(); try { return internalNext(); } catch (OutOfMemoryException ex) { clearFieldVectorMap(); lastOutcome = IterOutcome.STOP; throw UserException.memoryError(ex).build(logger); } catch (ExecutionSetupException e) { if (currentReader != null) { try { currentReader.close(); } catch (final Exception e2) { logger.error("Close failed for reader " + currentReaderClassName, e2); } } lastOutcome = IterOutcome.STOP; throw UserException.internalError(e) .addContext("Setup failed for", currentReaderClassName) .build(logger); } catch (UserException ex) { lastOutcome = IterOutcome.STOP; throw ex; } catch (Exception ex) { lastOutcome = IterOutcome.STOP; throw UserException.internalError(ex).build(logger); } finally { oContext.getStats().stopProcessing(); } } private void releaseAssets() { container.zeroVectors(); } private void clearFieldVectorMap() { for (final ValueVector v : mutator.fieldVectorMap().values()) { v.clear(); } for (final ValueVector v : mutator.implicitFieldVectorMap.values()) { v.clear(); } } private boolean getNextReaderIfHas() throws ExecutionSetupException { if (!readers.hasNext()) { return false; } currentReader = readers.next(); if (!isRepeatableScan && readers.hasNext()) { readers.remove(); } implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; currentReader.setup(oContext, mutator); currentReaderClassName = currentReader.getClass().getSimpleName(); return true; } private void addImplicitVectors() { try { if (!implicitColumnList.isEmpty()) { for (String column : implicitColumnList.get(0).keySet()) { final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR)); mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/); } } } catch(SchemaChangeException e) { // No exception should be thrown here. throw UserException.internalError(e) .addContext("Failure while allocating implicit vectors") .build(logger); } } private void populateImplicitVectorsAndSetCount() { mutator.populateImplicitVectors(implicitValues, recordCount); for (Map.Entry entry: mutator.fieldVectorMap().entrySet()) { logger.debug("set record count {} for vv {}", recordCount, entry.getKey()); entry.getValue().getMutator().setValueCount(recordCount); } } @Override public SelectionVector2 getSelectionVector2() { return null; } @Override public SelectionVector4 getSelectionVector4() { throw new UnsupportedOperationException(); } @Override public TypedFieldId getValueVectorId(SchemaPath path) { return container.getValueVectorId(path); } @Override public VectorWrapper getValueAccessorById(Class clazz, int... ids) { return container.getValueAccessorById(clazz, ids); } private void logRecordBatchStats() { final int MAX_FQN_LENGTH = 50; if (recordCount == 0) { return; // NOOP } RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext); } /** Might truncate the FQN if too long */ private String getFQNForLogging(int maxLength) { final String FQNKey = "FQN"; final ValueVector v = mutator.implicitFieldVectorMap.get(FQNKey); final Object fqnObj; if (v == null || v.getAccessor().getValueCount() == 0 || (fqnObj = ((NullableVarCharVector) v).getAccessor().getObject(0)) == null) { return "NA"; } String fqn = fqnObj.toString(); if (fqn != null && fqn.length() > maxLength) { fqn = fqn.substring(fqn.length() - maxLength, fqn.length()); } return fqn; } /** * Row set mutator implementation provided to record readers created by * this scan batch. Made visible so that tests can create this mutator * without also needing a ScanBatch instance. (This class is really independent * of the ScanBatch, but resides here for historical reasons. This is, * in turn, the only use of the generated vector readers in the vector * package.) */ @VisibleForTesting public static class Mutator implements OutputMutator { /** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}). * It's initialized to false, or reset to false after #isNewSchema or after #clear, until a new value vector * or a value vector with different type is added to fieldVectorMap. **/ private boolean schemaChanged; /** Regular fields' value vectors indexed by fields' keys. */ private final CaseInsensitiveMap regularFieldVectorMap = CaseInsensitiveMap.newHashMap(); /** Implicit fields' value vectors index by fields' keys. */ private final CaseInsensitiveMap implicitFieldVectorMap = CaseInsensitiveMap.newHashMap(); private final SchemaChangeCallBack callBack = new SchemaChangeCallBack(); private final BufferAllocator allocator; private final VectorContainer container; private final OperatorContext oContext; public Mutator(OperatorContext oContext, BufferAllocator allocator, VectorContainer container) { this.oContext = oContext; this.allocator = allocator; this.container = container; this.schemaChanged = false; } public Map fieldVectorMap() { return regularFieldVectorMap; } public Map implicitFieldVectorMap() { return implicitFieldVectorMap; } @Override public T addField(MaterializedField field, Class clazz) throws SchemaChangeException { return addField(field, clazz, false); } @Override public void allocate(int recordCount) { for (final ValueVector v : regularFieldVectorMap.values()) { AllocationHelper.allocate(v, recordCount, 50, 10); } } /** * Reports whether schema has changed (field was added or re-added) since * last call to {@link #isNewSchema}. Returns true at first call. */ @Override public boolean isNewSchema() { // Check if top-level schema or any of the deeper map schemas has changed. // Note: Callback's getSchemaChangedAndReset() must get called in order // to reset it and avoid false reports of schema changes in future. (Be // careful with short-circuit OR (||) operator.) final boolean deeperSchemaChanged = callBack.getSchemaChangedAndReset(); if (schemaChanged || deeperSchemaChanged) { schemaChanged = false; return true; } return false; } @Override public DrillBuf getManagedBuffer() { return oContext.getManagedBuffer(); } @Override public CallBack getCallBack() { return callBack; } @Override public void clear() { regularFieldVectorMap.clear(); implicitFieldVectorMap.clear(); container.clear(); schemaChanged = false; } @SuppressWarnings("resource") private T addField(MaterializedField field, Class clazz, boolean isImplicitField) throws SchemaChangeException { Map fieldVectorMap; if (isImplicitField) { fieldVectorMap = implicitFieldVectorMap; } else { fieldVectorMap = regularFieldVectorMap; } if (!isImplicitField && implicitFieldVectorMap.containsKey(field.getName()) || isImplicitField && regularFieldVectorMap.containsKey(field.getName())) { throw new SchemaChangeException( String.format( "It's not allowed to have regular field and implicit field share common name %s. " + "Either change regular field name in datasource, or change the default implicit field names.", field.getName())); } // Check if the field exists. ValueVector v = fieldVectorMap.get(field.getName()); // for the cases when fields have a different scale or precision, // the new vector should be used to handle the value correctly if (v == null || !v.getField().getType().equals(field.getType())) { // Field does not exist--add it to the map and the output container. v = TypeHelper.getNewVector(field, allocator, callBack); if (!clazz.isAssignableFrom(v.getClass())) { throw new SchemaChangeException( String.format( "The class that was provided, %s, does not correspond to the " + "expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); } final ValueVector old = fieldVectorMap.put(field.getName(), v); if (old != null) { old.clear(); container.remove(old); } container.add(v); // Only mark schema change for regular vectors added to the container; implicit schema is constant. if (!isImplicitField) { schemaChanged = true; } } return clazz.cast(v); } private void populateImplicitVectors(Map implicitValues, int recordCount) { if (implicitValues != null) { for (Map.Entry entry : implicitValues.entrySet()) { @SuppressWarnings("resource") final NullableVarCharVector v = (NullableVarCharVector) implicitFieldVectorMap.get(entry.getKey()); String val; if ((val = entry.getValue()) != null) { AllocationHelper.allocate(v, recordCount, val.length()); final byte[] bytes = val.getBytes(); for (int j = 0; j < recordCount; j++) { v.getMutator().setSafe(j, bytes, 0, bytes.length); } v.getMutator().setValueCount(recordCount); } else { AllocationHelper.allocate(v, recordCount, 0); v.getMutator().setValueCount(recordCount); } } } } } @Override public Iterator> iterator() { return container.iterator(); } @Override public WritableBatch getWritableBatch() { return WritableBatch.get(this); } @Override public void close() throws Exception { container.clear(); mutator.clear(); if (currentReader != null) { currentReader.close(); } } @Override public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException( String.format("You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } @Override public VectorContainer getContainer() { return container; } /** * Verify list of implicit column values is valid input: * - Either implicit column list is empty; * - Or implicit column list has same sie as reader list, and the key set is same across all the readers. * @param numReaders * @param implicitColumnList * @return return true if */ private boolean verifyImplcitColumns(int numReaders, List> implicitColumnList) { if (implicitColumnList.isEmpty()) { return true; } if (numReaders != implicitColumnList.size()) { return false; } Map firstMap = implicitColumnList.get(0); for (int i = 1; i< implicitColumnList.size(); i++) { Map nonFirstMap = implicitColumnList.get(i); if (!firstMap.keySet().equals(nonFirstMap.keySet())) { return false; } } return true; } @Override public boolean hasFailed() { return lastOutcome == IterOutcome.STOP; } @Override public void dump() { logger.error("ScanBatch[container={}, currentReader={}, schema={}]", container, currentReader, schema); } }