aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2018-06-05 01:08:55 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2018-06-05 01:08:55 -0700
commit2c2a86b5d5be6f77ee72d16f990b39ae59f479b9 (patch)
treebd9316c159cbe8afa19c224aceaa0a3804a100ce
parente8c1a0c2fdb09a628d9cc925676af870d5a7a946 (diff)
[SPARK-24453][SS] Fix error recovering from the failure in a no-data batch
## What changes were proposed in this pull request? The error occurs when we are recovering from a failure in a no-data batch (say X) that has been planned (i.e. written to offset log) but not executed (i.e. not written to commit log). Upon recovery the following sequence of events happen. 1. `MicroBatchExecution.populateStartOffsets` sets `currentBatchId` to X. Since there was no data in the batch, the `availableOffsets` is same as `committedOffsets`, so `isNewDataAvailable` is `false`. 2. When `MicroBatchExecution.constructNextBatch` is called, ideally it should immediately return true because the next batch has already been constructed. However, the check of whether the batch has been constructed was `if (isNewDataAvailable) return true`. Since the planned batch is a no-data batch, it escaped this check and proceeded to plan the same batch X *once again*. The solution is to have an explicit flag that signifies whether a batch has already been constructed or not. `populateStartOffsets` is going to set the flag appropriately. ## How was this patch tested? new unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #21491 from tdas/SPARK-24453.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala71
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala2
3 files changed, 98 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7817360810..17ffa2a517 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -127,6 +127,12 @@ class MicroBatchExecution(
}
/**
+ * Signifies whether current batch (i.e. for the batch `currentBatchId`) has been constructed
+ * (i.e. written to the offsetLog) and is ready for execution.
+ */
+ private var isCurrentBatchConstructed = false
+
+ /**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
*/
@@ -154,7 +160,6 @@ class MicroBatchExecution(
triggerExecutor.execute(() => {
if (isActive) {
- var currentBatchIsRunnable = false // Whether the current batch is runnable / has been run
var currentBatchHasNewData = false // Whether the current batch had new data
startTrigger()
@@ -175,7 +180,9 @@ class MicroBatchExecution(
// new data to process as `constructNextBatch` may decide to run a batch for
// state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
// is available or not.
- currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)
+ if (!isCurrentBatchConstructed) {
+ isCurrentBatchConstructed = constructNextBatch(noDataBatchesEnabled)
+ }
// Remember whether the current batch has data or not. This will be required later
// for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
@@ -183,7 +190,7 @@ class MicroBatchExecution(
currentBatchHasNewData = isNewDataAvailable
currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
- if (currentBatchIsRunnable) {
+ if (isCurrentBatchConstructed) {
if (currentBatchHasNewData) updateStatusMessage("Processing new data")
else updateStatusMessage("No new data but cleaning up state")
runBatch(sparkSessionForStream)
@@ -194,9 +201,12 @@ class MicroBatchExecution(
finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded
- // If the current batch has been executed, then increment the batch id, else there was
- // no data to execute the batch
- if (currentBatchIsRunnable) currentBatchId += 1 else Thread.sleep(pollingDelayMs)
+ // If the current batch has been executed, then increment the batch id and reset flag.
+ // Otherwise, there was no data to execute the batch and sleep for some time
+ if (isCurrentBatchConstructed) {
+ currentBatchId += 1
+ isCurrentBatchConstructed = false
+ } else Thread.sleep(pollingDelayMs)
}
updateStatusMessage("Waiting for next trigger")
isActive
@@ -231,6 +241,7 @@ class MicroBatchExecution(
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
+ isCurrentBatchConstructed = true
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
@@ -269,6 +280,7 @@ class MicroBatchExecution(
// here, so we do nothing here.
}
currentBatchId = latestCommittedBatchId + 1
+ isCurrentBatchConstructed = false
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
} else if (latestCommittedBatchId < latestBatchId - 1) {
@@ -313,11 +325,8 @@ class MicroBatchExecution(
* - If either of the above is true, then construct the next batch by committing to the offset
* log that range of offsets that the next batch will process.
*/
- private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = withProgressLocked {
- // If new data is already available that means this method has already been called before
- // and it must have already committed the offset range of next batch to the offset log.
- // Hence do nothing, just return true.
- if (isNewDataAvailable) return true
+ private def constructNextBatch(noDataBatchesEnabled: Boolean): Boolean = withProgressLocked {
+ if (isCurrentBatchConstructed) return true
// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
@@ -348,9 +357,14 @@ class MicroBatchExecution(
batchTimestampMs = triggerClock.getTimeMillis())
// Check whether next batch should be constructed
- val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
+ val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch
+ logTrace(
+ s"noDataBatchesEnabled = $noDataBatchesEnabled, " +
+ s"lastExecutionRequiresAnotherBatch = $lastExecutionRequiresAnotherBatch, " +
+ s"isNewDataAvailable = $isNewDataAvailable, " +
+ s"shouldConstructNextBatch = $shouldConstructNextBatch")
if (shouldConstructNextBatch) {
// Commit the next batch offset range to the offset log
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
new file mode 100644
index 0000000000..c228740df0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.streaming.StreamTest
+
+class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
+
+ import testImplicits._
+
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("SPARK-24156: do not plan a no-data batch again after it has already been planned") {
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(df)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15), // Set watermark to 5
+ CheckAnswer(),
+ AddData(inputData, 25), // Set watermark to 15 to make MicroBatchExecution run no-data batch
+ CheckAnswer((10, 5)), // Last batch should be a no-data batch
+ StopStream,
+ Execute { q =>
+ // Delete the last committed batch from the commit log to signify that the last batch
+ // (a no-data batch) never completed
+ val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+ q.commitLog.purgeAfter(commit - 1)
+ },
+ // Add data before start so that MicroBatchExecution can plan a batch. It should not,
+ // it should first re-run the incomplete no-data batch and then run a new batch to process
+ // new data.
+ AddData(inputData, 30),
+ StartStream(),
+ CheckNewAnswer((15, 1)), // This should not throw the error reported in SPARK-24156
+ StopStream,
+ Execute { q =>
+ // Delete the entire commit log
+ val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+ q.commitLog.purge(commit + 1)
+ },
+ AddData(inputData, 50),
+ StartStream(),
+ CheckNewAnswer((25, 1), (30, 1)) // This should not throw the error reported in SPARK-24156
+ )
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index f348dac131..4c3fd58cb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -292,7 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
/** Execute arbitrary code */
object Execute {
def apply(func: StreamExecution => Any): AssertOnQuery =
- AssertOnQuery(query => { func(query); true })
+ AssertOnQuery(query => { func(query); true }, "Execute")
}
object AwaitEpoch {