-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27468][core] Track correct storage level of RDDs and partitions. #25779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #110542 has finished for PR 25779 at commit
|
|
Test build #110543 has finished for PR 25779 at commit
|
|
Ping? Anyone? Bueller? |
|
Let's try... @squito @attilapiros |
|
Test build #111456 has finished for PR 25779 at commit
|
|
Test build #111489 has finished for PR 25779 at commit
|
| // unpersisted. First level key is the RDD id, second is the split index. | ||
| // | ||
| // This is a var so that we can "clear" it and restart with a new map when it becomes empty, | ||
| // to save some memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you change this to a scaladoc comment? (for tooltips in my IDE)
| assert(exec.usedOnHeap === 1) | ||
| assert(exec.usedOffHeap === 0) | ||
|
|
||
| val old1 = exec.addBlock(RDDBlockId(1, 1), 3L, 3L, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the scenario where you'd get another update for the same block, without a removal event in the middle? Just if there is a dropped event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, if a block is evicted from memory to disk, for example; and later if it's loaded back into memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good point. worth a comment? that case usually slips my mind, anyway
| assert(dist.memoryRemaining === maxMemory - dist.memoryUsed) | ||
|
|
||
| val part = wrapper.info.partitions.get(0) | ||
| assert(part.memoryUsed === rdd1b1.memSize * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated to your change, but for the line just above (wrapper.info.partitions.get(0)), would it be better to have wrapper.info.partitions.get.find(_.blockName == rddb1.blockId.name) ? Or at least an assert after that, that you have the right block? otherwise will lead to confusing errors later if somebody changes the test and the rdd1b1 isn't added first.
Also unrelated to your change, but coming back to this code after a long time I was a little surprised that info.partitions isn't ordered by the partition, that may also be worth a comment somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can add an assert.
The list is not ordered for performance reasons. See RDDPartitionSeq in LiveEntyty.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(actually the assert / change is not needed since a few lines above there's an assertion that the list has a single element.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok fair enough.
I did realize why RDDPartitionSeq is the way it is after a closer look, I was just mentioning that its not so obvious, or commented in the places I expected (the api just exposes a Seq[RDDPartitionInfo])
| } | ||
| } | ||
| } | ||
| update(rdd, now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like you went from conditionally updating rdd to always updating it, was that intentional? if so, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure where you're seeing the conditional update? This line is executed for all live RDDs in the old code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the update in this line was unnecessary?
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 219 to 221 in 4c8f114
| if (rdd.removeDistribution(exec)) { | |
| update(rdd, now) | |
| } |
I think this is the update which caused the confusion for @squito too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, it was that part, combined with misreading this part of the old code. Though it seems like it could be a conditional update, but no change from before.
|
Test build #111504 has finished for PR 25779 at commit
|
|
retest this please |
|
Test build #111611 has started for PR 25779 at commit |
attilapiros
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it was just a quick going through on the changes and I would like to read it through more carefully tomorrow (CET time).
| * This is a var so that we can "clear" it and restart with a new map when it becomes empty, | ||
| * to save some memory. | ||
| */ | ||
| private var rddBlockUsage = new OpenHashMap[Int, RDDBlockTracker]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. You also missed the comment on the very previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I missed that one. It is strange the map is not clearable in this case this is very fine this way.
| } | ||
| } | ||
| } | ||
| update(rdd, now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the update in this line was unnecessary?
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 219 to 221 in 4c8f114
| if (rdd.removeDistribution(exec)) { | |
| update(rdd, now) | |
| } |
I think this is the update which caused the confusion for @squito too.
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
| assert(exec.usedOnHeap === 1) | ||
| assert(exec.usedOffHeap === 0) | ||
|
|
||
| val old1 = exec.addBlock(RDDBlockId(1, 1), 3L, 3L, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good point. worth a comment? that case usually slips my mind, anyway
| private val partitions = new HashMap[String, LiveRDDPartition]() | ||
| private val partitions = new OpenHashMap[Int, LiveRDDPartition]() | ||
| private val partitionSeq = new RDDPartitionSeq() | ||
| private var partitionCount = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a separate partitionCount is needed? The partitionSeq has a length method which follows the add/remove partition.
| // Remove block 1 from bm 1. | ||
| listener.onBlockUpdated(SparkListenerBlockUpdated( | ||
| BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize))) | ||
| BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0L, 0L))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was I assume a mistake in the test as the delta was reported here and not the currently used mem and disk size. This is why this line is changed.
|
Test build #111623 has finished for PR 25779 at commit
|
|
We might need to inform about the change of LiveEntity to the patch of #25943 , otherwise we might forget to include it in KVStore snapshot. So according to the javadoc, Could we remove the exceptional case as well, or there's technical reason to leave it as it is? |
Yeah, I'll have to take a second look at that code, run some more experiments. That's a really awkward API... let me see if there's a way to end up with the correct info without having to change the block manager code. |
|
I'll take a closer look tomorrow, but a quick experiment tells me that:
The thing with the second case is that the code path that triggers it doesn't send an update to the master. I'm actually working on a feature where that would happen, though. But given that, I'll probably undo some of the changes here. Mainly relying on the fact that the block update contains the delta for the memory, and that disk blocks are never dropped outside of unpersist events. For my WIP changes (for which I'll eventually post a PR) I'll look at reporting the correct sizes when dropping blocks in the second case above. |
|
(BTW it could be that the problem in the second bullet above is fixed by #25973. I'll look more carefully tomorrow.) |
Right, #25973 is for dealing with this, though the origin target of #25973 was broadcast. So if we would like to just allow two different semantics of "size" on UpdateBlockInfo (either "addition" and "removal" depending on storage level), I guess #25973 is correct. If we feel awkward about current logic and only want to report "current" status of block, we may have to make non trivial change and #25973 will be no longer correct then. |
|
Ok, running some tests I found a couple of extra bugs. I'll file a new bug with the details, and then update this one with the fix for SPARK-27468 specifically. After my investigations, #25973 seems correct, but it does not fix the new bugs that I found in my testing. |
|
Filed SPARK-29319. I'll update this with the fixes that aren't related to that bug. |
Previously, the RDD level would change depending on the status reported by executors for the block they were storing, and individual blocks would reflect that. That is wrong because different blocks may be stored differently in different executors. So now the RDD tracks the user-provided storage level, while the individual partitions reflect the current storage level of that particular block, including the current number of replicas.
|
Test build #111652 has finished for PR 25779 at commit
|
|
Ping |
| * Tracker for data related to a persisted RDD. Note the storage level is kept immutable here, | ||
| * following the current behavior of `RDD.persist()`, even though it is mutable in the `RDDInfo` | ||
| * structure. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A stage may have been submitted before the RDD was persisted at all, then another stage submitted after the RDD is persisted, so its not actually immutable. You wouldn't properly capture that here.
(probably not the optimal thing for the user to do, but I've seen weirder things ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually that works fine, because the listener does not track RDDs that are not persisted; so there wouldn't be a live RDD for the first stage in your example; it would be created when the second stage is submitted, and at that point the storage level cannot be changed further.
I'll update the comment (maybe even add a unit test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, thanks for adding the test too.
| } | ||
|
|
||
| private class LiveRDDPartition(val blockName: String) { | ||
| private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help me a bit if this was called 'requestedStorageLevel' and in RDDPartitionInfo it was called 'effectiveStorageLevel'. I guess its not worth changing RDDPartitionInfo, but maybe just a comment along those lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
|
Test build #111792 has finished for PR 25779 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
| * Tracker for data related to a persisted RDD. Note the storage level is kept immutable here, | ||
| * following the current behavior of `RDD.persist()`, even though it is mutable in the `RDDInfo` | ||
| * structure. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, thanks for adding the test too.
|
merged to master |
Previously, the RDD level would change depending on the status reported
by executors for the block they were storing, and individual blocks would
reflect that. That is wrong because different blocks may be stored differently
in different executors.
So now the RDD tracks the user-provided storage level, while the individual
partitions reflect the current storage level of that particular block,
including the current number of replicas.