-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-2468] Netty based block server / client module #1907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
QA tests have started for PR 1907. This patch merges cleanly. |
QA results for PR 1907: |
@rxin Very nice ! Do you have any benchmarks of how fast things are ? In terms of say % of network bandwidth we can use ? |
} | ||
} | ||
} | ||
val region = new DefaultFileRegion(fileChannel, offset, blockLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: make sure DefaultFileRegion can be released properly in error conditions.
QA tests have started for PR 1907. This patch merges cleanly. |
That's part of the TODO :) |
QA results for PR 1907: |
QA tests have started for PR 1907. This patch merges cleanly. |
QA results for PR 1907: |
* The buffer's life cycle is NOT managed by the JVM, and thus requiring explicit declaration of | ||
* reference by the retain method and release method. | ||
*/ | ||
class ReferenceCountedBuffer(underlying: ByteBuf) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could make this class a value class. Based off this patch, the runtime wouldn't have ReferenceCountedBuffer objects created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
@rxin |
I'm not sure what akka.io would bring here. The main benefit seems to be more native Scala APIs. In this case I am not sure if we care since this is a low level module. Akka's abstraction is fairly high level, which can make stuff harder to debug for a low level module. Netty is presumably more robust (due to the much larger user base). I'm also not sure whether akka-io supports using transferTo directly under the hood and managed buffers to reduce GC (my quick Google search suggests that it doesn't). |
QA tests have started for PR 1907. This patch merges cleanly. |
QA results for PR 1907: |
QA tests have started for PR 1907. This patch merges cleanly. |
QA tests have started for PR 1907. This patch merges cleanly. |
QA results for PR 1907: |
QA results for PR 1907: |
QA tests have started for PR 1907. This patch merges cleanly. |
QA tests have started for PR 1907. This patch merges cleanly. |
QA results for PR 1907: |
One more thing -- I just remembered that a recent PR (#1632) was trying to handle cases where the server never replies to blockId requests. Can this happen with the Netty code path ? Or is there some ack-timeout built into netty ? |
I think client can make use of ReadTimeoutHandler |
That's a good one to add. But again I wouldn't want to put everything into a single PR ... |
Yeah, you can probably make a JIRA for it or something like that. I just remembered that this is an issue, so I put it here :) |
Yup. Created https://issues.apache.org/jira/browse/SPARK-2468 This should be pretty easy to do. |
I reviewed the changes that are relevant to spark core and they LGTM |
Thanks for looking at this. Merging it in master & branch-1.1. This is off by default so the danger is very small. We will continue improving it and with changes like SPARK-3019 it will make it easier to use. |
I have know it,thanks. ------------------ 原始邮件 ------------------ 主题: Re: [spark] [SPARK-2468] Netty based block server / client module(#1907) Thanks for looking at this. Merging it in master & branch-1.1. This is off by default so the danger is very small. We will continue improving it and with changes like SPARK-3019 it will make it easier to use. — |
I have know it,thanks. ------------------ 原始邮件 ------------------ 主题: Re: [spark] [SPARK-2468] Netty based block server / client module(#1907) — |
This is a rewrite of the original Netty module that was added about 1.5 years ago. The old code was turned off by default and didn't really work because it lacked a frame decoder (only worked with very very small blocks). For this pull request, I tried to make the changes non-instrusive to the rest of Spark. I only added an init and shutdown to BlockManager/DiskBlockManager, and a bunch of comments to help me understand the existing code base. Compared with the old Netty module, this one features: - It appears to work :) - SPARK-2941: option to specicy nio vs oio vs epoll for channel/transport. By default nio is used. (Not using Epoll yet because I have found some bugs with its implementation) - SPARK-2943: options to specify send buf and receive buf for users who want to do hyper tuning - SPARK-2942: io errors are reported from server to client (the protocol uses negative length to indicate error) - SPARK-2940: fetching multiple blocks in a single request to reduce syscalls - SPARK-2959: clients share a single thread pool - SPARK-2990: use PooledByteBufAllocator to reduce GC (basically a Netty managed pool of buffers with jmalloc) - SPARK-2625: added fetchWaitTime metric and fixed thread-safety issue in metrics update. - SPARK-2367: bump Netty version to 4.0.21.Final to address an Epoll bug (https://groups.google.com/forum/#!topic/netty/O7m-HxCJpCA) Compared with the existing communication manager, this one features: - IMO it is substantially easier to understand - zero-copy send for the server for on-disk blocks - one-copy receive (due to a frame decoder) - don't quote me on this, but I think a lot less sys calls - SPARK-2990: use PooledByteBufAllocator to reduce GC (basically a Netty managed pool of buffers with jmalloc) - SPARK-2941: option to specicy nio vs oio vs epoll for channel/transport. By default nio is used. (Not using Epoll yet because I have found some bugs with its implementation) - SPARK-2943: options to specify send buf and receive buf for users who want to do hyper tuning TODOs before it can fully replace the existing ConnectionManager, if that ever happens (most of them should probably be done in separate PRs since this needs to be turned on explicitly) - [x] Basic test cases - [ ] More unit/integration tests for failures - [ ] Performance analysis - [ ] Support client connection reuse so we don't need to keep opening new connections (not sure how useful this would be) - [ ] Support putting blocks in addition to fetching blocks (i.e. two way transfer) - [x] Support serving non-disk blocks - [ ] Support SASL authentication For a more comprehensive list, see https://issues.apache.org/jira/browse/SPARK-2468 Thanks to @coderplay for peer coding with me on a Sunday. Author: Reynold Xin <rxin@apache.org> Closes #1907 from rxin/netty and squashes the following commits: f921421 [Reynold Xin] Upgrade Netty to 4.0.22.Final to fix another Epoll bug. 4b174ca [Reynold Xin] Shivaram's code review comment. 4a3dfe7 [Reynold Xin] Switched to nio for default (instead of epoll on Linux). 56bfb9d [Reynold Xin] Bump Netty version to 4.0.21.Final for some bug fixes. b443a4b [Reynold Xin] Added debug message to help debug Jenkins failures. 57fc4d7 [Reynold Xin] Added test cases for BlockHeaderEncoder and BlockFetchingClientHandlerSuite. 22623e9 [Reynold Xin] Added exception handling and test case for BlockServerHandler and BlockFetchingClientHandler. 6550dd7 [Reynold Xin] Fixed block mgr init bug. 60c2edf [Reynold Xin] Beefed up server/client integration tests. 38d88d5 [Reynold Xin] Added missing test files. 6ce3f3c [Reynold Xin] Added some basic test cases. 47f7ce0 [Reynold Xin] Created server and client packages and moved files there. b16f412 [Reynold Xin] Added commit count. f13022d [Reynold Xin] Remove unused clone() in BlockFetcherIterator. c57d68c [Reynold Xin] Added back missing files. 842dfa7 [Reynold Xin] Made everything work with proper reference counting. 3fae001 [Reynold Xin] Connected the new netty network module with rest of Spark. 1a8f6d4 [Reynold Xin] Completed protocol documentation. 2951478 [Reynold Xin] New Netty implementation. cc7843d [Reynold Xin] Basic skeleton. (cherry picked from commit 3a8b68b) Signed-off-by: Reynold Xin <rxin@apache.org>
@@ -86,13 +89,25 @@ private[spark] class BlockManager( | |||
new TachyonStore(this, tachyonBlockManager) | |||
} | |||
|
|||
private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since useNetty is now a member val, maybe we can change the lines in getMultiple() to use this val instead of the original boolean too. Check out below lines:
...
getMultiple(){
...
val iter =
if (conf.getBoolean("spark.shuffle.use.netty", false)) {
new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer,
readMetrics)
} else {
Thanks for re-implementing this. Really nice to see this working :) |
Previous pull request (#1907) was reverted. This brings it back. Still looking into the hang. Author: Reynold Xin <rxin@apache.org> Closes #1971 from rxin/netty1 and squashes the following commits: b0be96f [Reynold Xin] Added test to make sure outstandingRequests are cleaned after firing the events. 4c6d0ee [Reynold Xin] Pass callbacks cleanly. 603dce7 [Reynold Xin] Upgrade Netty to 4.0.23 to fix the DefaultFileRegion bug. 88be1d4 [Reynold Xin] Downgrade to 4.0.21 to work around a bug in writing DefaultFileRegion. 002626a [Reynold Xin] Remove netty-test-file.txt. db6e6e0 [Reynold Xin] Revert "Revert "[SPARK-2468] Netty based block server / client module"" (cherry picked from commit 8b9dc99) Signed-off-by: Reynold Xin <rxin@apache.org>
Previous pull request (#1907) was reverted. This brings it back. Still looking into the hang. Author: Reynold Xin <rxin@apache.org> Closes #1971 from rxin/netty1 and squashes the following commits: b0be96f [Reynold Xin] Added test to make sure outstandingRequests are cleaned after firing the events. 4c6d0ee [Reynold Xin] Pass callbacks cleanly. 603dce7 [Reynold Xin] Upgrade Netty to 4.0.23 to fix the DefaultFileRegion bug. 88be1d4 [Reynold Xin] Downgrade to 4.0.21 to work around a bug in writing DefaultFileRegion. 002626a [Reynold Xin] Remove netty-test-file.txt. db6e6e0 [Reynold Xin] Revert "Revert "[SPARK-2468] Netty based block server / client module""
This is a rewrite of the original Netty module that was added about 1.5 years ago. The old code was turned off by default and didn't really work because it lacked a frame decoder (only worked with very very small blocks). For this pull request, I tried to make the changes non-instrusive to the rest of Spark. I only added an init and shutdown to BlockManager/DiskBlockManager, and a bunch of comments to help me understand the existing code base. Compared with the old Netty module, this one features: - It appears to work :) - SPARK-2941: option to specicy nio vs oio vs epoll for channel/transport. By default nio is used. (Not using Epoll yet because I have found some bugs with its implementation) - SPARK-2943: options to specify send buf and receive buf for users who want to do hyper tuning - SPARK-2942: io errors are reported from server to client (the protocol uses negative length to indicate error) - SPARK-2940: fetching multiple blocks in a single request to reduce syscalls - SPARK-2959: clients share a single thread pool - SPARK-2990: use PooledByteBufAllocator to reduce GC (basically a Netty managed pool of buffers with jmalloc) - SPARK-2625: added fetchWaitTime metric and fixed thread-safety issue in metrics update. - SPARK-2367: bump Netty version to 4.0.21.Final to address an Epoll bug (https://groups.google.com/forum/#!topic/netty/O7m-HxCJpCA) Compared with the existing communication manager, this one features: - IMO it is substantially easier to understand - zero-copy send for the server for on-disk blocks - one-copy receive (due to a frame decoder) - don't quote me on this, but I think a lot less sys calls - SPARK-2990: use PooledByteBufAllocator to reduce GC (basically a Netty managed pool of buffers with jmalloc) - SPARK-2941: option to specicy nio vs oio vs epoll for channel/transport. By default nio is used. (Not using Epoll yet because I have found some bugs with its implementation) - SPARK-2943: options to specify send buf and receive buf for users who want to do hyper tuning TODOs before it can fully replace the existing ConnectionManager, if that ever happens (most of them should probably be done in separate PRs since this needs to be turned on explicitly) - [x] Basic test cases - [ ] More unit/integration tests for failures - [ ] Performance analysis - [ ] Support client connection reuse so we don't need to keep opening new connections (not sure how useful this would be) - [ ] Support putting blocks in addition to fetching blocks (i.e. two way transfer) - [x] Support serving non-disk blocks - [ ] Support SASL authentication For a more comprehensive list, see https://issues.apache.org/jira/browse/SPARK-2468 Thanks to @coderplay for peer coding with me on a Sunday. Author: Reynold Xin <rxin@apache.org> Closes apache#1907 from rxin/netty and squashes the following commits: f921421 [Reynold Xin] Upgrade Netty to 4.0.22.Final to fix another Epoll bug. 4b174ca [Reynold Xin] Shivaram's code review comment. 4a3dfe7 [Reynold Xin] Switched to nio for default (instead of epoll on Linux). 56bfb9d [Reynold Xin] Bump Netty version to 4.0.21.Final for some bug fixes. b443a4b [Reynold Xin] Added debug message to help debug Jenkins failures. 57fc4d7 [Reynold Xin] Added test cases for BlockHeaderEncoder and BlockFetchingClientHandlerSuite. 22623e9 [Reynold Xin] Added exception handling and test case for BlockServerHandler and BlockFetchingClientHandler. 6550dd7 [Reynold Xin] Fixed block mgr init bug. 60c2edf [Reynold Xin] Beefed up server/client integration tests. 38d88d5 [Reynold Xin] Added missing test files. 6ce3f3c [Reynold Xin] Added some basic test cases. 47f7ce0 [Reynold Xin] Created server and client packages and moved files there. b16f412 [Reynold Xin] Added commit count. f13022d [Reynold Xin] Remove unused clone() in BlockFetcherIterator. c57d68c [Reynold Xin] Added back missing files. 842dfa7 [Reynold Xin] Made everything work with proper reference counting. 3fae001 [Reynold Xin] Connected the new netty network module with rest of Spark. 1a8f6d4 [Reynold Xin] Completed protocol documentation. 2951478 [Reynold Xin] New Netty implementation. cc7843d [Reynold Xin] Basic skeleton.
Previous pull request (apache#1907) was reverted. This brings it back. Still looking into the hang. Author: Reynold Xin <rxin@apache.org> Closes apache#1971 from rxin/netty1 and squashes the following commits: b0be96f [Reynold Xin] Added test to make sure outstandingRequests are cleaned after firing the events. 4c6d0ee [Reynold Xin] Pass callbacks cleanly. 603dce7 [Reynold Xin] Upgrade Netty to 4.0.23 to fix the DefaultFileRegion bug. 88be1d4 [Reynold Xin] Downgrade to 4.0.21 to work around a bug in writing DefaultFileRegion. 002626a [Reynold Xin] Remove netty-test-file.txt. db6e6e0 [Reynold Xin] Revert "Revert "[SPARK-2468] Netty based block server / client module""
…pache#1907) Iceberg releases are now from `apple-1.3.0.x` branch, but Spark CI is still using `apple-1.3.x`. We should update to the former.
This is a rewrite of the original Netty module that was added about 1.5 years ago. The old code was turned off by default and didn't really work because it lacked a frame decoder (only worked with very very small blocks).
For this pull request, I tried to make the changes non-instrusive to the rest of Spark. I only added an init and shutdown to BlockManager/DiskBlockManager, and a bunch of comments to help me understand the existing code base.
Compared with the old Netty module, this one features:
Compared with the existing communication manager, this one features:
TODOs before it can fully replace the existing ConnectionManager, if that ever happens (most of them should probably be done in separate PRs since this needs to be turned on explicitly)
For a more comprehensive list, see https://issues.apache.org/jira/browse/SPARK-2468
Thanks to @coderplay for peer coding with me on a Sunday.