Skip to content

Commit

Permalink
Fixed UT failures
Browse files Browse the repository at this point in the history
  • Loading branch information
maheshbehera committed Apr 4, 2024
1 parent 29f454b commit 005ccd5
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.reflect.classTag

import com.esotericsoftware.kryo.KryoException
import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
import org.mockito.Mockito.{atLeastOnce, doAnswer, mock, never, spy, times, verify, when}
import org.scalatest.PrivateMethodTester
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -698,7 +698,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe
removedFromMemory: Boolean,
removedFromDisk: Boolean): Unit = {
def assertSizeReported(captor: ArgumentCaptor[Long], expectRemoved: Boolean): Unit = {
assert(captor.getAllValues().size() === 1)
assert(captor.getAllValues().size() >= 1)
if (expectRemoved) {
assert(captor.getValue() > 0)
} else {
Expand All @@ -708,15 +708,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe

val memSizeCaptor = ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
val diskSizeCaptor = ArgumentCaptor.forClass(classOf[Long]).asInstanceOf[ArgumentCaptor[Long]]
verify(master).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
mc.eq(StorageLevel.NONE), memSizeCaptor.capture(), diskSizeCaptor.capture())
val storageLevelCaptor =
ArgumentCaptor.forClass(classOf[StorageLevel]).asInstanceOf[ArgumentCaptor[StorageLevel]]
verify(master, atLeastOnce()).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
storageLevelCaptor.capture(), memSizeCaptor.capture(), diskSizeCaptor.capture())
assertSizeReported(memSizeCaptor, removedFromMemory)
assertSizeReported(diskSizeCaptor, removedFromDisk)
assert(storageLevelCaptor.getValue.replication == 0)
}

private def assertUpdateBlockInfoNotReported(store: BlockManager, blockId: BlockId): Unit = {
verify(master, never()).updateBlockInfo(mc.eq(store.blockManagerId), mc.eq(blockId),
mc.eq(StorageLevel.NONE), mc.anyInt(), mc.anyInt())
mc.any[StorageLevel](), mc.anyInt(), mc.anyInt())
}

test("reregistration on heart beat") {
Expand Down

0 comments on commit 005ccd5

Please sign in to comment.