Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Sep 12, 2019

Previously, the RDD level would change depending on the status reported
by executors for the block they were storing, and individual blocks would
reflect that. That is wrong because different blocks may be stored differently
in different executors.

So now the RDD tracks the user-provided storage level, while the individual
partitions reflect the current storage level of that particular block,
including the current number of replicas.

@SparkQA
Copy link

SparkQA commented Sep 12, 2019

Test build #110542 has finished for PR 25779 at commit eadc487.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 13, 2019

Test build #110543 has finished for PR 25779 at commit 0f1a996.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Sep 25, 2019

Ping? Anyone? Bueller?

@vanzin
Copy link
Contributor Author

vanzin commented Sep 26, 2019

Let's try... @squito @attilapiros

@SparkQA
Copy link

SparkQA commented Sep 27, 2019

Test build #111456 has finished for PR 25779 at commit effcbb5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 27, 2019

Test build #111489 has finished for PR 25779 at commit d2c5ed4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class _GaussianMixtureParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol,
  • class GaussianMixtureModel(JavaModel, _GaussianMixtureParams, JavaMLWritable, JavaMLReadable,
  • class GaussianMixture(JavaEstimator, _GaussianMixtureParams, JavaMLWritable, JavaMLReadable):
  • class _KMeansParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol, HasTol,
  • class KMeansModel(JavaModel, _KMeansParams, GeneralJavaMLWritable, JavaMLReadable,
  • class KMeans(JavaEstimator, _KMeansParams, JavaMLWritable, JavaMLReadable):
  • class _BisectingKMeansParams(HasMaxIter, HasFeaturesCol, HasSeed, HasPredictionCol,
  • class BisectingKMeansModel(JavaModel, _BisectingKMeansParams, JavaMLWritable, JavaMLReadable,
  • class BisectingKMeans(JavaEstimator, _BisectingKMeansParams, JavaMLWritable, JavaMLReadable):
  • class _LDAParams(HasMaxIter, HasFeaturesCol, HasSeed, HasCheckpointInterval):
  • class LDAModel(JavaModel, _LDAParams):
  • class LDA(JavaEstimator, _LDAParams, JavaMLReadable, JavaMLWritable):
  • class _PowerIterationClusteringParams(HasMaxIter, HasWeightCol):
  • class PowerIterationClustering(_PowerIterationClusteringParams, JavaParams, JavaMLReadable,

// unpersisted. First level key is the RDD id, second is the split index.
//
// This is a var so that we can "clear" it and restart with a new map when it becomes empty,
// to save some memory.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change this to a scaladoc comment? (for tooltips in my IDE)

assert(exec.usedOnHeap === 1)
assert(exec.usedOffHeap === 0)

val old1 = exec.addBlock(RDDBlockId(1, 1), 3L, 3L, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the scenario where you'd get another update for the same block, without a removal event in the middle? Just if there is a dropped event?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, if a block is evicted from memory to disk, for example; and later if it's loaded back into memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point. worth a comment? that case usually slips my mind, anyway

assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)

val part = wrapper.info.partitions.get(0)
assert(part.memoryUsed === rdd1b1.memSize * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to your change, but for the line just above (wrapper.info.partitions.get(0)), would it be better to have wrapper.info.partitions.get.find(_.blockName == rddb1.blockId.name) ? Or at least an assert after that, that you have the right block? otherwise will lead to confusing errors later if somebody changes the test and the rdd1b1 isn't added first.

Also unrelated to your change, but coming back to this code after a long time I was a little surprised that info.partitions isn't ordered by the partition, that may also be worth a comment somewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add an assert.

The list is not ordered for performance reasons. See RDDPartitionSeq in LiveEntyty.scala.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(actually the assert / change is not needed since a few lines above there's an assertion that the list has a single element.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fair enough.

I did realize why RDDPartitionSeq is the way it is after a closer look, I was just mentioning that its not so obvious, or commented in the places I expected (the api just exposes a Seq[RDDPartitionInfo])

}
}
}
update(rdd, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like you went from conditionally updating rdd to always updating it, was that intentional? if so, why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure where you're seeing the conditional update? This line is executed for all live RDDs in the old code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the update in this line was unnecessary?

if (rdd.removeDistribution(exec)) {
update(rdd, now)
}

I think this is the update which caused the confusion for @squito too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, it was that part, combined with misreading this part of the old code. Though it seems like it could be a conditional update, but no change from before.

@SparkQA
Copy link

SparkQA commented Sep 28, 2019

Test build #111504 has finished for PR 25779 at commit f6a539a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Sep 30, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Sep 30, 2019

Test build #111611 has started for PR 25779 at commit f6a539a.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it was just a quick going through on the changes and I would like to read it through more carefully tomorrow (CET time).

* This is a var so that we can "clear" it and restart with a new map when it becomes empty,
* to save some memory.
*/
private var rddBlockUsage = new OpenHashMap[Int, RDDBlockTracker]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a val

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. You also missed the comment on the very previous line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I missed that one. It is strange the map is not clearable in this case this is very fine this way.

}
}
}
update(rdd, now)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the update in this line was unnecessary?

if (rdd.removeDistribution(exec)) {
update(rdd, now)
}

I think this is the update which caused the confusion for @squito too.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

assert(exec.usedOnHeap === 1)
assert(exec.usedOffHeap === 0)

val old1 = exec.addBlock(RDDBlockId(1, 1), 3L, 3L, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point. worth a comment? that case usually slips my mind, anyway

private val partitions = new HashMap[String, LiveRDDPartition]()
private val partitions = new OpenHashMap[Int, LiveRDDPartition]()
private val partitionSeq = new RDDPartitionSeq()
private var partitionCount = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a separate partitionCount is needed? The partitionSeq has a length method which follows the add/remove partition.

// Remove block 1 from bm 1.
listener.onBlockUpdated(SparkListenerBlockUpdated(
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize)))
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0L, 0L)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was I assume a mistake in the test as the delta was reported here and not the currently used mem and disk size. This is why this line is changed.

@SparkQA
Copy link

SparkQA commented Sep 30, 2019

Test build #111623 has finished for PR 25779 at commit 055da38.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Sep 30, 2019

We might need to inform about the change of LiveEntity to the patch of #25943 , otherwise we might forget to include it in KVStore snapshot.

So according to the javadoc, BlockManager.reportBlockStatus is intended to current storage status of a block, which is clear, except the case of BlockManager.dropFromMemory. It also receives droppedMemorySize and let master deal with information of both sizes (memory/disk) which is really confusing (made me feeling that the event contains "delta") and doesn't match of the explanation of "block managers report the current mem and disk used by the block".

Could we remove the exceptional case as well, or there's technical reason to leave it as it is?

@vanzin
Copy link
Contributor Author

vanzin commented Oct 1, 2019

It also receives droppedMemorySize and let master deal with information of both sizes (memory/disk) which is really confusing

Yeah, I'll have to take a second look at that code, run some more experiments. That's a really awkward API... let me see if there's a way to end up with the correct info without having to change the block manager code.

@vanzin
Copy link
Contributor Author

vanzin commented Oct 1, 2019

I'll take a closer look tomorrow, but a quick experiment tells me that:

  • if the block is dropped from memory while adding another block, the update contains a storage level with "useMemory = false", and the correct memory size that was dropped
  • if the block is dropped via BlockManager.removeBlockInternal, which is triggered by unpersisting an RDD, then the block update sent to the master does not contain the previous size, neither for disk nor memory.

The thing with the second case is that the code path that triggers it doesn't send an update to the master. I'm actually working on a feature where that would happen, though.

But given that, I'll probably undo some of the changes here. Mainly relying on the fact that the block update contains the delta for the memory, and that disk blocks are never dropped outside of unpersist events.

For my WIP changes (for which I'll eventually post a PR) I'll look at reporting the correct sizes when dropping blocks in the second case above.

@vanzin
Copy link
Contributor Author

vanzin commented Oct 1, 2019

(BTW it could be that the problem in the second bullet above is fixed by #25973. I'll look more carefully tomorrow.)

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Oct 1, 2019

if the block is dropped via BlockManager.removeBlockInternal, which is triggered by unpersisting an RDD, then the block update sent to the master does not contain the previous size, neither for disk nor memory.

Right, #25973 is for dealing with this, though the origin target of #25973 was broadcast.

So if we would like to just allow two different semantics of "size" on UpdateBlockInfo (either "addition" and "removal" depending on storage level), I guess #25973 is correct. If we feel awkward about current logic and only want to report "current" status of block, we may have to make non trivial change and #25973 will be no longer correct then.

@vanzin
Copy link
Contributor Author

vanzin commented Oct 1, 2019

Ok, running some tests I found a couple of extra bugs. I'll file a new bug with the details, and then update this one with the fix for SPARK-27468 specifically.

After my investigations, #25973 seems correct, but it does not fix the new bugs that I found in my testing.

@vanzin
Copy link
Contributor Author

vanzin commented Oct 1, 2019

Filed SPARK-29319. I'll update this with the fixes that aren't related to that bug.

Previously, the RDD level would change depending on the status reported
by executors for the block they were storing, and individual blocks would
reflect that. That is wrong because different blocks may be stored differently
in different executors.

So now the RDD tracks the user-provided storage level, while the individual
partitions reflect the current storage level of that particular block,
including the current number of replicas.
@vanzin vanzin changed the title [SPARK-27468][core] Track correct storage level and mem/disk usage for RDDs. [SPARK-27468][core] Track correct storage level of RDDs and partitions. Oct 1, 2019
@SparkQA
Copy link

SparkQA commented Oct 1, 2019

Test build #111652 has finished for PR 25779 at commit 0848fbf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Oct 3, 2019

Ping

* Tracker for data related to a persisted RDD. Note the storage level is kept immutable here,
* following the current behavior of `RDD.persist()`, even though it is mutable in the `RDDInfo`
* structure.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A stage may have been submitted before the RDD was persisted at all, then another stage submitted after the RDD is persisted, so its not actually immutable. You wouldn't properly capture that here.

(probably not the optimal thing for the user to do, but I've seen weirder things ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that works fine, because the listener does not track RDDs that are not persisted; so there wouldn't be a live RDD for the first stage in your example; it would be created when the second stage is submitted, and at that point the storage level cannot be changed further.

I'll update the comment (maybe even add a unit test).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks for adding the test too.

}

private class LiveRDDPartition(val blockName: String) {
private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help me a bit if this was called 'requestedStorageLevel' and in RDDPartitionInfo it was called 'effectiveStorageLevel'. I guess its not worth changing RDDPartitionInfo, but maybe just a comment along those lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@SparkQA
Copy link

SparkQA commented Oct 5, 2019

Test build #111792 has finished for PR 25779 at commit b95baec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

* Tracker for data related to a persisted RDD. Note the storage level is kept immutable here,
* following the current behavior of `RDD.persist()`, even though it is mutable in the `RDDInfo`
* structure.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks for adding the test too.

@asfgit asfgit closed this in d2f21b0 Oct 7, 2019
@squito
Copy link
Contributor

squito commented Oct 7, 2019

merged to master

@vanzin vanzin deleted the SPARK-27468 branch October 8, 2019 17:00
@dongjoon-hyun
Copy link
Member

Hi, @vanzin and @squito and @zsxwing .

SPARK-27468 is reported by @zsxwing at 2.4.1. Can we have this bug fix in branch-2.4 because Apache Spark 2.4 is LTS?

Also, cc @srowen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants