Skip to content

Conversation

@abhinavarora
Copy link
Contributor

@abhinavarora abhinavarora commented Feb 26, 2018

@kavyasrinet, @cs2be, @varunarora and I worked on this PR together.
Fixes: #8426, #8436, #8427 and #8428

This PR covers the following:

  1. Create operators for channel_create, channel_send, channel_receive and channel_close.
  2. Implement Go op.
  3. Unit test to check an end-to-end implementation of CSP.

This PR does not:

  1. Support CSP for training purposes.(i.e. implement back-propagation using CSP.). We will investigate this in future PRs.

We are working on creating more comprehensive unit tests to better capture the CSP model.

Abhinav Arora and others added 30 commits February 12, 2018 17:32
…huan_varun

# Conflicts:
#	python/paddle/v2/fluid/__init__.py
#	python/paddle/v2/fluid/concurrency.py
@abhinavarora abhinavarora changed the title [WIP] Add Go_op, Channel_create, channel_close, channel_send and channel_receive ops Add Go_op, Channel_create, channel_close, channel_send and channel_receive ops Feb 27, 2018
wangkuiyi
wangkuiyi previously approved these changes Feb 27, 2018
Copy link
Collaborator

@wangkuiyi wangkuiyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an awesome PR!

shape=None,
dtype=None,
lod_level=None,
capacity=None,
Copy link
Contributor

@panyx0718 panyx0718 Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to document in "Args" what is capacity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out. Updated the comments.

self.name, self.persistable, persistable))

if capacity is not None:
if is_new_var:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does every new Var need a capacity? Is it channel specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, capacity is specific only to a channel. I just added this information in the docstring comment also.



class TestRoutineOp(unittest.TestCase):
def test_simple_routine(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems test_concurrecy.py is not enabled now, should we enable it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs more support in Python side to enable it. We will enable it soon when the test will be able to run successfully


// Create Go Op routine
ProgramDesc goOpProgram;
BlockDesc *goOpBlock = goOpProgram.MutableBlock(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious about why you don't you get a new block from program but define goOpProgram and get the block0 of goOpProgram.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, i will update unit test

cpu, framework::ToTypeIndex(framework::proto::VarType::BOOL));
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(dev_place);
math::set_constant(dev_ctx, status_tensor, status);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that status_tensor is on CPU side and the dimention of it is {1}, why don't set status to status_tensor, but use math::set_constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean this should work status_tensor[0] = true/false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you fixed it.

framework::Executor executor(dev_place);
ExecuteOnThread(&executor, block, &new_scope);
});
go_thread.detach();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the model has many Go_op, do you think it is necessary that synchronizing these Go_op?
I think maybe we need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have an issue open to create a mutex operator. If users communicate using channels, they do not need to use mutex (since channel already contains locks). Mutex op will be required if users accesses other shared memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks.

@abhinavarora abhinavarora merged commit 0d878e4 into PaddlePaddle:develop Feb 28, 2018
@abhinavarora abhinavarora deleted the go_op_thuan_varun branch February 28, 2018 21:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement GoOp

9 participants