-
Notifications
You must be signed in to change notification settings - Fork 5.9k
Add Go_op, Channel_create, channel_close, channel_send and channel_receive ops #8593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Go_op, Channel_create, channel_close, channel_send and channel_receive ops #8593
Conversation
…huan_varun # Conflicts: # python/paddle/v2/fluid/__init__.py # python/paddle/v2/fluid/concurrency.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an awesome PR!
| shape=None, | ||
| dtype=None, | ||
| lod_level=None, | ||
| capacity=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to document in "Args" what is capacity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing this out. Updated the comments.
| self.name, self.persistable, persistable)) | ||
|
|
||
| if capacity is not None: | ||
| if is_new_var: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does every new Var need a capacity? Is it channel specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, capacity is specific only to a channel. I just added this information in the docstring comment also.
|
|
||
|
|
||
| class TestRoutineOp(unittest.TestCase): | ||
| def test_simple_routine(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems test_concurrecy.py is not enabled now, should we enable it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs more support in Python side to enable it. We will enable it soon when the test will be able to run successfully
|
|
||
| // Create Go Op routine | ||
| ProgramDesc goOpProgram; | ||
| BlockDesc *goOpBlock = goOpProgram.MutableBlock(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about why you don't you get a new block from program but define goOpProgram and get the block0 of goOpProgram.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, i will update unit test
| cpu, framework::ToTypeIndex(framework::proto::VarType::BOOL)); | ||
| platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); | ||
| auto &dev_ctx = *pool.Get(dev_place); | ||
| math::set_constant(dev_ctx, status_tensor, status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that status_tensor is on CPU side and the dimention of it is {1}, why don't set status to status_tensor, but use math::set_constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean this should work status_tensor[0] = true/false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you fixed it.
| framework::Executor executor(dev_place); | ||
| ExecuteOnThread(&executor, block, &new_scope); | ||
| }); | ||
| go_thread.detach(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the model has many Go_op, do you think it is necessary that synchronizing these Go_op?
I think maybe we need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we have an issue open to create a mutex operator. If users communicate using channels, they do not need to use mutex (since channel already contains locks). Mutex op will be required if users accesses other shared memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks.
@kavyasrinet, @cs2be, @varunarora and I worked on this PR together.
Fixes: #8426, #8436, #8427 and #8428
This PR covers the following:
This PR does not:
We are working on creating more comprehensive unit tests to better capture the CSP model.