-
Notifications
You must be signed in to change notification settings - Fork 149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1472][part-5] Use UnpooledByteBufAllocator to obtain accurate ByteBuf sizes to fix inaccurate usedMemory issue causing OOM #1534
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #1534 +/- ##
============================================
+ Coverage 54.17% 55.12% +0.94%
- Complexity 2822 2824 +2
============================================
Files 435 415 -20
Lines 24501 22155 -2346
Branches 2074 2079 +5
============================================
- Hits 13274 12212 -1062
+ Misses 10397 9179 -1218
+ Partials 830 764 -66 ☔ View full report in Codecov by Sentry. |
Why is Rust CI always failed? |
What's the initial motivation of using |
In the previous implementation, the pool cache was not used either. |
@@ -80,11 +83,16 @@ public class ShuffleBufferManager { | |||
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap(); | |||
|
|||
public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) { | |||
this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use RPC_SERVER_TYPE
=GRPC_NETTY to judge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -80,11 +83,16 @@ public class ShuffleBufferManager { | |||
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap(); | |||
|
|||
public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) { | |||
this.nettyServerEnabled = conf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; | |||
long heapSize = Runtime.getRuntime().maxMemory(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PlatformDependent.maxDirectMemory()
This method contains the logic of Runtime.getRuntime().maxMemory()
. Can it be merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We allow users to configure the maximum value of the direct memory through the variable MAX_DIRECT_MEMORY_SIZE
in the rss-env.sh
script, so the value of PlatformDependent.maxDirectMemory()
may be different from the value of Runtime.getRuntime().maxMemory()
, so we cannot merge this part of the code.
5da2661
to
bcbd1be
Compare
bcbd1be
to
d159181
Compare
Yeah, that seems right. Because the parameters |
c53111c
to
408ffcf
Compare
408ffcf
to
d84cf09
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left some minor comments
@@ -256,6 +256,7 @@ public void sendShuffleData( | |||
final long start = System.currentTimeMillis(); | |||
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(req); | |||
long alreadyReleasedSize = 0; | |||
boolean hasFailureOccurred = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related with this PR. You can submit another PR to fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code logic of GRPC and Netty is basically the same. In order to solve the problem of usedMemory
being inaccurate in the Netty scenario, I fixed the Netty scenario and also made the same changes to the GRPC side.
So, do we keep the changes for hasFailureOccurred
in Netty? It is a bit weird to only modify the same logic code for the Netty part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Let's reserve this.
@@ -80,11 +84,16 @@ public class ShuffleBufferManager { | |||
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = JavaUtils.newConcurrentMap(); | |||
|
|||
public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager) { | |||
this.nettyServerEnabled = conf.get(ShuffleServerConf.RPC_SERVER_TYPE) == ServerType.GRPC_NETTY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be defined by the ShuffleServer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You wanna pass nettyServerEnabled
from ShuffleServer
into ShuffleBufferManager
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The single source principle should be ensured. And the checking logic is not same, you can see the current code in shuffleServer.java
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
if (nettyServerEnabled) {
streamServer = new StreamServer(this);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. let me merge this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The single source principle should be ensured. And the checking logic is not same, you can see the current code in shuffleServer.java
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0; if (nettyServerEnabled) { streamServer = new StreamServer(this); }
Done in #1540.
@@ -465,7 +493,17 @@ void requirePreAllocatedSize(long delta) { | |||
} | |||
|
|||
public void releasePreAllocatedSize(long delta) { | |||
preAllocatedSize.addAndGet(-delta); | |||
if (preAllocatedSize.get() >= delta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could be optimized by like this:
int allocated = preAllocatedSize.addAndGet(-delta);
if (allocated < 0) {
LOG.warn(
"Current pre-allocated memory["
+ preAllocatedSize.get()
+ "] is less than released["
+ delta
+ "], set pre-allocated memory to 0");
preAllocatedSize.set(0L);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just following the same pattern in releaseMemory
, releaseFlushMemory
and releaseReadMemory
.
Maybe we can refactor the pattern in all the above methods like you said later in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
There is a flaky test. You can see https://github.com/apache/incubator-uniffle/actions/runs/7983021033/job/21797479973
|
I don't think it's related to this PR. It does not happen everytime, I'll take a deep look in the next couple of days. |
|
… when failing to cache shuffle data (#1597) ### What changes were proposed in this pull request? Release memory more accurately when failing to cache shuffle data. ### Why are the changes needed? A follow-up PR for: #1534. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
What changes were proposed in this pull request?
When we use
UnpooledByteBufAllocator
to allocate off-heapByteBuf
, Netty directly requests off-heap memory from the operating system instead of allocating it according topageSize
andchunkSize
. This way, we can obtain the exactByteBuf
size during the pre-allocation of memory, avoiding distortion of metrics such asusedMemory
.Moreover, we have restored the code submission of the PR #1521. We ensure that there is sufficient direct memory for the Netty server during decoding
sendShuffleDataRequest
by taking into account theencodedLength
ofByteBuf
in advance during the pre-allocation of memory, thus avoiding OOM during decodingsendShuffleDataRequest
.Since we are not using
PooledByteBufAllocator
, the PR #1524 is no longer needed.Why are the changes needed?
A sub PR for: #1519
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.