Skip to content

Commit

Permalink
[SPARK-36109][SS][TEST] Check data after adding data to topic in Kafk…
Browse files Browse the repository at this point in the history
…aSourceStressSuite

### What changes were proposed in this pull request?

This patch proposes to check data after adding data to topic in `KafkaSourceStressSuite`.

### Why are the changes needed?

The test logic in `KafkaSourceStressSuite` is not stable. For example, https://github.com/apache/spark/runs/3049244904.

Once we add data to a topic and then delete the topic before checking data, the expected answer is different to retrieved data from the sink.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes apache#33311 from viirya/stream-assert.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
viirya authored and dongjoon-hyun committed Jul 13, 2021
1 parent c46342e commit 201566c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2429,8 +2429,9 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
(d, running) => {
Random.nextInt(5) match {
case 0 => // Add a new topic
topics = topics ++ Seq(newStressTopic)
AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic",
val newTopic = newStressTopic
topics = topics ++ Seq(newTopic)
AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newTopic",
topicAction = (topic, partition) => {
if (partition.isEmpty) {
testUtils.createTopic(topic, partitions = nextInt(1, 6))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with

/** Assert that a condition on the active query is true */
class AssertOnQuery(val condition: StreamExecution => Boolean, val message: String)
extends StreamAction {
extends StreamAction with StreamMustBeRunning {
override def toString: String = s"AssertOnQuery(<condition>, $message)"
}

Expand Down Expand Up @@ -871,6 +871,10 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with

case r if r < 0.7 => // AddData
addRandomData()
// In some suites, e.g. `KafkaSourceStressSuite`, we delete Kafka topic in the
// `addData` closure. In the case, the topic with added data might be deleted
// before next check. So we must check data after adding data here.
addCheck()

case _ => // StopStream
addCheck()
Expand Down

0 comments on commit 201566c

Please sign in to comment.