-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-10983] Unified memory manager #9084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
As of this commit, acquiring execution memory may cause cached blocks to be evicted. There are still a couple of TODOs, notably figuring out what the `maxExecutionMemory` and `maxStorageMemory` should actually be. They may not be fixed since either side can now borrow from the other.
Without this, we cannot both avoid deadlocks and race conditions, because each individual component ShuffleMemoryManager, MemoryStore and MemoryManager all have their own respective locks. This commit allows us to simplify several unintuitive control flows that were introduced to avoid acquiring locks in different orders. Since we have only one lock now, these code blocks can be significantly simplified. A forseeable downside to this is parallelism is reduced, but memory acquisitions and releases don't actually occur that frequently, so performance should not suffer noticeably. Additional investigations about this should ensue.
Previously, ShuffleMemoryManager will allow each task to acquire up to 1/N of the entire storage + execution region. What we want is more like 1/N of the space not occupied by storage, since the "max" now varies over time.
This happened because we were still calling `this.notifyAll()` in ShuffleMemoryManager when we were holding the `memoryManager` lock. Easy fix.
Tests are passing in this commit, but there are follow-ups that need to be done (see TODOs added in this commit). More tests will be added in the future.
As of this commit all *MemoryManagerSuite's are documented and pass tests.
Test build #43602 has finished for PR 9084 at commit
|
// Deprecation message for memory fraction configs used in the old memory management model | ||
private val deprecatedMemoryFractionMessage = | ||
"As of Spark 1.6, execution and storage memory management are unified. " + | ||
"All memory fractions used in the old model are now deprecated and no longer read." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old configurations will still be respected in legacy mode, so this is slightly ambiguous / confusing. Is there an easy way to avoid the warning if the legacy mode configuration is turned on? If not, I suppose we could just expand the deprecation message to mention this corner case, perhaps by just appending a "(unless spark.XX.YY is enabled)" at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can print a different warning if legacy mode is on
Test build #43607 has finished for PR 9084 at commit
|
@@ -72,46 +92,62 @@ private[spark] abstract class MemoryManager { | |||
def acquireUnrollMemory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that acquireUnrollMemory
appears to act as a synonym for acquireStorageMemory
in the current implementation, it might be worth adding a brief comment above this method to explain that this extra method exists in order to give us the future flexibility to account for unroll memory differently in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, it's more than a synonym. In StaticMemoryManager
it's required to preserve existing behavior where unrolling doesn't evict all the blocks.
It looks like the test failures are occurring in the external sorter suites. Some of these tests exercise spilling-related logic (such as ensuring that spill files are cleaned up); in order to induce spilling, these tests configured the legacy memory fractions to be really small. In order to fix these tests, we can either enable legacy mode for those tests only or we can modify the tests to set the new configurations. Ideally these sorter unit tests wouldn't be relying on memory management behavior in order to test spill cleanup, but they were written a long time ago. We could refactor these tests, but I'd prefer a band-aid solution for now. |
Many of these tests relied on setting a very low shuffle memory fraction. Since that config is now deprecated and not read, the tests now fail because things aren't spilling. This approach is, however, quite brittle as it depends on the heap size of the test JVM. Instead, we should explicitly set a limit on the memory used.
Unfortunately not all affected tests can be easily rewritten. Instead, this commit adds TODO's in places where the tests may not actually do anything so we can fix them in the future (SPARK-11078).
Test build #43622 has finished for PR 9084 at commit
|
Test build #43631 has finished for PR 9084 at commit
|
retest this please |
Test build #43646 has finished for PR 9084 at commit
|
Test build #1890 has finished for PR 9084 at commit
|
Test build #1888 has finished for PR 9084 at commit
|
Test build #1889 has finished for PR 9084 at commit
|
Test build #1891 has finished for PR 9084 at commit
|
LGTM. I'm going to merge this now. There's some followup tasks to do in terms of refactoring some old spilling tests, but there's followup tasks for those and we'll do them soon. |
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced: - **spark.memory.fraction (default 0.75)**: fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. - **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `spark.memory.fraction`. Cached data may only be evicted if total storage exceeds this region. - **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility. For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in apache#9000. Author: Andrew Or <andrew@databricks.com> Closes apache#9084 from andrewor14/unified-memory-manager.
#9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic. Author: Andrew Or <andrew@databricks.com> Closes #9124 from andrewor14/spilling-tests.
@@ -40,6 +41,10 @@ private[spark] abstract class MemoryManager { | |||
_memoryStore | |||
} | |||
|
|||
// Amount of execution/storage memory in use, accesses must be synchronized on `this` | |||
protected var _executionMemoryUsed: Long = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why here is protected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pzz2011 you're making several comments on old PRs. Generally people won't see that and it's not the place for discussion anyway. If you can formulate a specific question beyond "why is the code this way?" ask on user@.
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced:
spark.memory.fraction
. Cached data may only be evicted if total storage exceeds this region.For a detailed description of the design, see SPARK-10000. This patch builds on top of the
MemoryManager
interface introduced in #9000.