aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Rozov <vrozov@apache.org>2018-04-11 10:12:07 -0700
committerParth Chandra <parthc@apache.org>2018-04-17 18:18:57 -0700
commit931b43ec54bf47dcbb4aa9ae4499f37a8f21b408 (patch)
treef3042dd426abd4be5678823b3b06cbfb68b630cd
parentfdc69783a0f0014e331fe853b623750ba88c34e0 (diff)
DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation
This closes #1208
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java333
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java11
4 files changed, 221 insertions, 140 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7e7623818..034d6c23d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.partitionsender;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
@@ -167,10 +168,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
} else {
sendEmptyBatch(true);
}
- } catch (IOException e) {
+ } catch (ExecutionException e) {
incoming.kill(false);
logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
- context.getExecutorState().fail(e);
+ context.getExecutorState().fail(e.getCause());
}
return false;
@@ -197,10 +198,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
first = false;
sendEmptyBatch(false);
}
- } catch (IOException e) {
+ } catch (ExecutionException e) {
incoming.kill(false);
logger.error("Error while flushing outgoing batches", e);
- context.getExecutorState().fail(e);
+ context.getExecutorState().fail(e.getCause());
return false;
} catch (SchemaChangeException e) {
incoming.kill(false);
@@ -211,8 +212,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
case OK:
try {
partitioner.partitionBatch(incoming);
- } catch (IOException e) {
- context.getExecutorState().fail(e);
+ } catch (ExecutionException e) {
+ context.getExecutorState().fail(e.getCause());
incoming.kill(false);
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index bb69f39a1..e9838f79f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -18,10 +18,15 @@
package org.apache.drill.exec.physical.impl.partitionsender;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
@@ -31,7 +36,8 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.testing.CountDownLatchInjection;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Decorator class to hide multiple Partitioner existence from the caller
@@ -41,34 +47,37 @@ import com.google.common.collect.Lists;
* The algorithm to figure out processing versus wait time is based on following formula:
* totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner)
*/
-public class PartitionerDecorator {
+public final class PartitionerDecorator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
private List<Partitioner> partitioners;
private final OperatorStats stats;
- private final String tName;
- private final String childThreadPrefix;
private final ExecutorService executor;
private final FragmentContext context;
+ private final Thread thread;
+ private final boolean enableParallelTaskExecution;
+ PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
+ this(partitioners, stats, context, partitioners.size() > 1);
+ }
- public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
+ PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context, boolean enableParallelTaskExecution) {
this.partitioners = partitioners;
this.stats = stats;
this.context = context;
- this.executor = context.getExecutor();
- this.tName = Thread.currentThread().getName();
- this.childThreadPrefix = "Partitioner-" + tName + "-";
+ this.enableParallelTaskExecution = enableParallelTaskExecution;
+ executor = enableParallelTaskExecution ? context.getExecutor() : MoreExecutors.newDirectExecutorService();
+ thread = Thread.currentThread();
}
/**
* partitionBatch - decorator method to call real Partitioner(s) to process incoming batch
* uses either threading or not threading approach based on number Partitioners
* @param incoming
- * @throws IOException
+ * @throws ExecutionException
*/
- public void partitionBatch(final RecordBatch incoming) throws IOException {
+ public void partitionBatch(final RecordBatch incoming) throws ExecutionException {
executeMethodLogic(new PartitionBatchHandlingClass(incoming));
}
@@ -76,9 +85,9 @@ public class PartitionerDecorator {
* flushOutgoingBatches - decorator to call real Partitioner(s) flushOutgoingBatches
* @param isLastBatch
* @param schemaChanged
- * @throws IOException
+ * @throws ExecutionException
*/
- public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws IOException {
+ public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws ExecutionException {
executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, schemaChanged));
}
@@ -118,105 +127,115 @@ public class PartitionerDecorator {
return null;
}
- @VisibleForTesting
- protected List<Partitioner> getPartitioners() {
+ List<Partitioner> getPartitioners() {
return partitioners;
}
/**
* Helper to execute the different methods wrapped into same logic
* @param iface
- * @throws IOException
+ * @throws ExecutionException
*/
- protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException {
- if (partitioners.size() == 1 ) {
- // no need for threads
- final OperatorStats localStatsSingle = partitioners.get(0).getStats();
- localStatsSingle.clear();
- localStatsSingle.startProcessing();
+ @VisibleForTesting
+ void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException {
+ // To simulate interruption of main fragment thread and interrupting the partition threads, create a
+ // CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or
+ // interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads.
+ try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) {
+ testCountDownLatch.initialize(1);
+ final AtomicInteger count = new AtomicInteger();
+ List<PartitionerTask> partitionerTasks = new ArrayList<>(partitioners.size());
+ ExecutionException executionException = null;
+ // start waiting on main stats to adjust by sum(max(processing)) at the end
+ startWait();
try {
- iface.execute(partitioners.get(0));
+ partitioners.forEach(partitioner -> createAndExecute(iface, testCountDownLatch, count, partitionerTasks, partitioner));
+ // Wait for main fragment interruption.
+ injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
+ testCountDownLatch.countDown();
+ } catch (InterruptedException e) {
+ logger.warn("fragment thread interrupted", e);
+ } catch (RejectedExecutionException e) {
+ logger.warn("Failed to execute partitioner tasks. Execution service down?", e);
+ executionException = new ExecutionException(e);
} finally {
- localStatsSingle.stopProcessing();
- stats.mergeMetrics(localStatsSingle);
- // since main stats did not have any wait time - adjust based of partitioner stats wait time
- // main stats processing time started recording in BaseRootExec
- stats.adjustWaitNanos(localStatsSingle.getWaitNanos());
+ await(count, partitionerTasks);
+ stopWait();
+ processPartitionerTasks(partitionerTasks, executionException);
}
- return;
}
+ }
- long maxProcessTime = 0l;
- // start waiting on main stats to adjust by sum(max(processing)) at the end
- stats.startWait();
- final CountDownLatch latch = new CountDownLatch(partitioners.size());
- final List<CustomRunnable> runnables = Lists.newArrayList();
- final List<Future<?>> taskFutures = Lists.newArrayList();
- CountDownLatchInjection testCountDownLatch = null;
- try {
- // To simulate interruption of main fragment thread and interrupting the partition threads, create a
- // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or
- // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads.
- testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch");
- testCountDownLatch.initialize(1);
- for (final Partitioner part : partitioners) {
- final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch);
- runnables.add(runnable);
- taskFutures.add(executor.submit(runnable));
+ private void createAndExecute(GeneralExecuteIface iface, CountDownLatchInjection testCountDownLatch, AtomicInteger count,
+ List<PartitionerTask> partitionerTasks, Partitioner partitioner) {
+ PartitionerTask partitionerTask = new PartitionerTask(this, iface, partitioner, count, testCountDownLatch);
+ executor.execute(partitionerTask);
+ partitionerTasks.add(partitionerTask);
+ count.incrementAndGet();
+ }
+
+ /**
+ * Wait for completion of all partitioner tasks.
+ * @param count current number of task not yet completed
+ * @param partitionerTasks list of partitioner tasks submitted for execution
+ */
+ private void await(AtomicInteger count, List<PartitionerTask> partitionerTasks) {
+ boolean cancelled = false;
+ while (count.get() > 0) {
+ if (context.getExecutorState().shouldContinue() || cancelled) {
+ LockSupport.park();
+ } else {
+ logger.warn("Cancelling fragment {} partitioner tasks...", context.getFragIdString());
+ partitionerTasks.forEach(partitionerTask -> partitionerTask.cancel(true));
+ cancelled = true;
}
+ }
+ }
- while (true) {
- try {
- // Wait for main fragment interruption.
- injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
-
- // If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch.
- injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown();
-
- latch.await();
- break;
- } catch (final InterruptedException e) {
- // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
- if (!context.getExecutorState().shouldContinue()) {
- logger.debug("Interrupting partioner threads. Fragment thread {}", tName);
- for(Future<?> f : taskFutures) {
- f.cancel(true);
- }
+ private void startWait() {
+ if (enableParallelTaskExecution) {
+ stats.startWait();
+ }
+ }
- break;
- }
- }
- }
+ private void stopWait() {
+ if (enableParallelTaskExecution) {
+ stats.stopWait();
+ }
+ }
- IOException excep = null;
- for (final CustomRunnable runnable : runnables ) {
- IOException myException = runnable.getException();
- if ( myException != null ) {
- if ( excep == null ) {
- excep = myException;
- } else {
- excep.addSuppressed(myException);
- }
+ private void processPartitionerTasks(List<PartitionerTask> partitionerTasks, ExecutionException executionException) throws ExecutionException {
+ long maxProcessTime = 0l;
+ for (PartitionerTask partitionerTask : partitionerTasks) {
+ ExecutionException e = partitionerTask.getException();
+ if (e != null) {
+ if (executionException == null) {
+ executionException = e;
+ } else {
+ executionException.getCause().addSuppressed(e.getCause());
}
- final OperatorStats localStats = runnable.getPart().getStats();
- long currentProcessingNanos = localStats.getProcessingNanos();
+ }
+ if (executionException == null) {
+ final OperatorStats localStats = partitionerTask.getStats();
// find out max Partitioner processing time
- maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime;
+ if (enableParallelTaskExecution) {
+ long currentProcessingNanos = localStats.getProcessingNanos();
+ maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime;
+ } else {
+ maxProcessTime += localStats.getWaitNanos();
+ }
stats.mergeMetrics(localStats);
}
- if ( excep != null ) {
- throw excep;
- }
- } finally {
- stats.stopWait();
- // scale down main stats wait time based on calculated processing time
- // since we did not wait for whole duration of above execution
+ }
+ if (executionException != null) {
+ throw executionException;
+ }
+ // scale down main stats wait time based on calculated processing time
+ // since we did not wait for whole duration of above execution
+ if (enableParallelTaskExecution) {
stats.adjustWaitNanos(-maxProcessTime);
-
- // Done with the latch, close it.
- if (testCountDownLatch != null) {
- testCountDownLatch.close();
- }
+ } else {
+ stats.adjustWaitNanos(maxProcessTime);
}
}
@@ -237,7 +256,7 @@ public class PartitionerDecorator {
private final RecordBatch incoming;
- public PartitionBatchHandlingClass(RecordBatch incoming) {
+ PartitionBatchHandlingClass(RecordBatch incoming) {
this.incoming = incoming;
}
@@ -268,62 +287,116 @@ public class PartitionerDecorator {
}
/**
- * Helper class to wrap Runnable with customized naming
- * Exception handling
+ * Helper class to wrap Runnable with cancellation and waiting for completion support
*
*/
- private static class CustomRunnable implements Runnable {
+ private static class PartitionerTask implements Runnable {
+
+ private enum STATE {
+ NEW,
+ COMPLETING,
+ NORMAL,
+ EXCEPTIONAL,
+ CANCELLED,
+ INTERRUPTING,
+ INTERRUPTED
+ }
+
+ private final AtomicReference<STATE> state;
+ private final AtomicReference<Thread> runner;
+ private final PartitionerDecorator partitionerDecorator;
+ private final AtomicInteger count;
- private final String parentThreadName;
- private final CountDownLatch latch;
private final GeneralExecuteIface iface;
- private final Partitioner part;
+ private final Partitioner partitioner;
private CountDownLatchInjection testCountDownLatch;
- private volatile IOException exp;
+ private volatile ExecutionException exception;
- public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface,
- final Partitioner part, CountDownLatchInjection testCountDownLatch) {
- this.parentThreadName = parentThreadName;
- this.latch = latch;
+ public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
+ state = new AtomicReference<>(STATE.NEW);
+ runner = new AtomicReference<>();
+ this.partitionerDecorator = partitionerDecorator;
this.iface = iface;
- this.part = part;
+ this.partitioner = partitioner;
+ this.count = count;
this.testCountDownLatch = testCountDownLatch;
}
@Override
public void run() {
- // Test only - Pause until interrupted by fragment thread
- try {
- testCountDownLatch.await();
- } catch (final InterruptedException e) {
- logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e);
- }
-
- final Thread currThread = Thread.currentThread();
- final String currThreadName = currThread.getName();
- final OperatorStats localStats = part.getStats();
- try {
- final String newThreadName = parentThreadName + currThread.getId();
- currThread.setName(newThreadName);
+ final Thread thread = Thread.currentThread();
+ if (runner.compareAndSet(null, thread)) {
+ final String name = thread.getName();
+ thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId()));
+ final OperatorStats localStats = partitioner.getStats();
localStats.clear();
localStats.startProcessing();
- iface.execute(part);
- } catch (IOException e) {
- exp = e;
- } finally {
- localStats.stopProcessing();
- currThread.setName(currThreadName);
- latch.countDown();
+ ExecutionException executionException = null;
+ try {
+ // Test only - Pause until interrupted by fragment thread
+ testCountDownLatch.await();
+ if (state.get() == STATE.NEW) {
+ iface.execute(partitioner);
+ }
+ } catch (InterruptedException e) {
+ if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
+ logger.warn("Partitioner Task interrupted during the run", e);
+ }
+ } catch (Throwable t) {
+ executionException = new ExecutionException(t);
+ } finally {
+ if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
+ if (executionException == null) {
+ localStats.stopProcessing();
+ state.lazySet(STATE.NORMAL);
+ } else {
+ exception = executionException;
+ state.lazySet(STATE.EXCEPTIONAL);
+ }
+ }
+ if (count.decrementAndGet() == 0) {
+ LockSupport.unpark(partitionerDecorator.thread);
+ }
+ thread.setName(name);
+ while (state.get() == STATE.INTERRUPTING) {
+ Thread.yield();
+ }
+ // Clear interrupt flag
+ Thread.interrupted();
+ }
+ }
+ }
+
+ void cancel(boolean mayInterruptIfRunning) {
+ Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread,
+ String.format("PartitionerTask can be cancelled only from the main %s thread", partitionerDecorator.thread.getName()));
+ if (runner.compareAndSet(null, partitionerDecorator.thread)) {
+ if (partitionerDecorator.executor instanceof ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this);
+ }
+ count.decrementAndGet();
+ } else {
+ if (mayInterruptIfRunning) {
+ if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTING)) {
+ try {
+ runner.get().interrupt();
+ } finally {
+ state.lazySet(STATE.INTERRUPTED);
+ }
+ }
+ } else {
+ state.compareAndSet(STATE.NEW, STATE.CANCELLED);
+ }
}
}
- public IOException getException() {
- return this.exp;
+ public ExecutionException getException() {
+ return this.exception;
}
- public Partitioner getPart() {
- return part;
+ public OperatorStats getStats() {
+ return partitioner.getStats();
}
}
- }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
index 957d79b9f..4103ee62c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.testing;
+import org.apache.drill.common.AutoCloseables.Closeable;
+
/**
* This class is used internally for tracking injected countdown latches. These latches are specified via
* {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option.
@@ -25,7 +27,7 @@ package org.apache.drill.exec.testing;
* with the expected number of countdown and awaits. The child threads count down on the same latch (same site class
* and same descriptor), and once there are enough, the parent thread continues.
*/
-public interface CountDownLatchInjection {
+public interface CountDownLatchInjection extends Closeable {
/**
* Initializes the underlying latch
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index c431fea74..eaaa87d89 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -28,6 +28,7 @@ import java.io.PrintWriter;
import java.nio.file.Paths;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.ExecutionException;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.PlanTestBase;
@@ -68,6 +69,8 @@ import org.apache.drill.exec.server.options.OptionValue.AccessibleScopes;
import org.apache.drill.exec.server.options.OptionValue.OptionScope;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.test.OperatorFixture.MockExecutorState;
+
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -221,6 +224,7 @@ public class TestPartitionSender extends PlanTestBase {
FragmentContextImpl context = null;
try {
context = new FragmentContextImpl(drillbitContext, planFragment, null, registry);
+ context.setExecutorState(new MockExecutorState());
final int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
final HashPartitionSender partSender = new HashPartitionSender(majorFragmentId, hashToRandomExchange, hashToRandomExchange.getExpression(), mfEndPoints);
partionSenderRootExec = new MockPartitionSenderRootExec(context, incoming, partSender);
@@ -285,8 +289,8 @@ public class TestPartitionSender extends PlanTestBase {
partionSenderRootExec.getStats().startProcessing();
try {
partDecor.executeMethodLogic(new InjectExceptionTest());
- fail("Should throw IOException here");
- } catch (IOException ioe) {
+ fail("executeMethodLogic should throw an exception.");
+ } catch (ExecutionException e) {
final OperatorProfile.Builder oPBuilder = OperatorProfile.newBuilder();
partionSenderRootExec.getStats().addAllMetrics(oPBuilder);
final List<MetricValue> metrics = oPBuilder.getMetricList();
@@ -298,7 +302,8 @@ public class TestPartitionSender extends PlanTestBase {
assertEquals(actualThreads, metric.getLongValue());
}
}
- assertEquals(actualThreads-1, ioe.getSuppressed().length);
+ assertTrue(e.getCause() instanceof IOException);
+ assertEquals(actualThreads-1, e.getCause().getSuppressed().length);
} finally {
partionSenderRootExec.getStats().stopProcessing();
}