Skip to content

Commit

Permalink
Mojo: Add a "secondary buffer" to MessageInTransit.
Browse files Browse the repository at this point in the history
I think I really want to move away from "unowned buffer"
MessageInTransits, but I'll do that refactor after this change (since
it'd conflict badly with this change). Also, the format of the secondary
buffer isn't finalized yet.

R=yzshen@chromium.org

Review URL: https://codereview.chromium.org/177123011

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@253633 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
viettrungluu@chromium.org committed Feb 26, 2014
1 parent a881c45 commit 71a7ad9
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 59 deletions.
115 changes: 94 additions & 21 deletions mojo/system/message_in_transit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,15 @@ MessageInTransit::MessageInTransit(OwnedBuffer,
uint32_t num_bytes,
uint32_t num_handles,
const void* bytes)
: owns_main_buffer_(true),
: owns_buffers_(true),
main_buffer_size_(RoundUpMessageAlignment(sizeof(Header) + num_bytes)),
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)) {
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)),
secondary_buffer_size_(0),
secondary_buffer_(NULL) {
DCHECK_LE(num_bytes, kMaxMessageNumBytes);
DCHECK_LE(num_handles, kMaxMessageNumHandles);

// Note: If dispatchers are subsequently attached (in particular, if
// |num_handles| is nonzero), then |total_size| will have to be adjusted.
// TODO(vtl): Figure out where we should really set this.
header()->total_size = static_cast<uint32_t>(main_buffer_size_);
// |total_size| is updated below, from the other values.
header()->type = type;
header()->subtype = subtype;
header()->source_id = kInvalidEndpointId;
Expand All @@ -66,6 +65,9 @@ MessageInTransit::MessageInTransit(OwnedBuffer,
header()->num_handles = num_handles;
header()->reserved0 = 0;
header()->reserved1 = 0;
// Note: If dispatchers are subsequently attached (in particular, if
// |num_handles| is nonzero), then |total_size| will have to be adjusted.
UpdateTotalSize();

if (bytes) {
memcpy(MessageInTransit::bytes(), bytes, num_bytes);
Expand All @@ -78,41 +80,62 @@ MessageInTransit::MessageInTransit(OwnedBuffer,

MessageInTransit::MessageInTransit(OwnedBuffer,
const MessageInTransit& other)
: owns_main_buffer_(true),
: owns_buffers_(true),
main_buffer_size_(other.main_buffer_size()),
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)) {
main_buffer_(base::AlignedAlloc(main_buffer_size_, kMessageAlignment)),
secondary_buffer_size_(other.secondary_buffer_size()),
secondary_buffer_(secondary_buffer_size_ ?
base::AlignedAlloc(secondary_buffer_size_,
kMessageAlignment) : NULL) {
DCHECK(!other.dispatchers_.get());
DCHECK_GE(main_buffer_size_, sizeof(Header));
DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u);

memcpy(main_buffer_, other.main_buffer_, main_buffer_size_);
memcpy(main_buffer_, other.main_buffer(), main_buffer_size_);
memcpy(secondary_buffer_, other.secondary_buffer(), secondary_buffer_size_);

// TODO(vtl): This will change.
DCHECK_EQ(main_buffer_size_,
RoundUpMessageAlignment(sizeof(Header) + num_bytes()));
}

MessageInTransit::MessageInTransit(UnownedBuffer,
size_t message_size,
void* buffer)
: owns_main_buffer_(false),
main_buffer_size_(message_size),
main_buffer_(buffer) {
DCHECK_GE(main_buffer_size_, sizeof(Header));
DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u);
DCHECK(main_buffer_);
: owns_buffers_(false),
main_buffer_size_(0),
main_buffer_(NULL),
secondary_buffer_size_(0),
secondary_buffer_(NULL) {
DCHECK_GE(message_size, sizeof(Header));
DCHECK_EQ(message_size % kMessageAlignment, 0u);
DCHECK(buffer);

Header* header = static_cast<Header*>(buffer);
DCHECK_EQ(header->total_size, message_size);

main_buffer_size_ =
RoundUpMessageAlignment(sizeof(Header) + header->num_bytes);
DCHECK_LE(main_buffer_size_, message_size);
main_buffer_ = buffer;
DCHECK_EQ(reinterpret_cast<uintptr_t>(main_buffer_) % kMessageAlignment, 0u);
// TODO(vtl): This will change.
DCHECK_EQ(main_buffer_size_,
RoundUpMessageAlignment(sizeof(Header) + num_bytes()));

if (message_size > main_buffer_size_) {
secondary_buffer_size_ = message_size - main_buffer_size_;
secondary_buffer_ = static_cast<char*>(buffer) + main_buffer_size_;
DCHECK_EQ(reinterpret_cast<uintptr_t>(secondary_buffer_) %
kMessageAlignment, 0u);
}
}

MessageInTransit::~MessageInTransit() {
if (owns_main_buffer_) {
if (owns_buffers_) {
base::AlignedFree(main_buffer_);
base::AlignedFree(secondary_buffer_); // Okay if null.
#ifndef NDEBUG
main_buffer_size_ = 0;
main_buffer_ = NULL;
secondary_buffer_size_ = 0;
secondary_buffer_ = NULL;
#endif
}

Expand Down Expand Up @@ -149,7 +172,7 @@ bool MessageInTransit::GetNextMessageSize(const void* buffer,
void MessageInTransit::SetDispatchers(
scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers) {
DCHECK(dispatchers.get());
DCHECK(owns_main_buffer_);
DCHECK(owns_buffers_);
DCHECK(!dispatchers_.get());

dispatchers_ = dispatchers.Pass();
Expand All @@ -159,5 +182,55 @@ void MessageInTransit::SetDispatchers(
#endif
}

void MessageInTransit::SerializeAndCloseDispatchers(Channel* channel) {
DCHECK(channel);
DCHECK(owns_buffers_);
DCHECK(!secondary_buffer_);
CHECK_EQ(num_handles(),
dispatchers_.get() ? dispatchers_->size() : static_cast<size_t>(0));

if (!num_handles())
return;

// The size of the secondary buffer. We'll start with the size of the entry
// size table (which will contain the size of the data for each handle), and
// add to it as we go along.
size_t size = RoundUpMessageAlignment(num_handles() * sizeof(uint32_t));
// The maximum size that we'll need for the secondary buffer. We'll allocate
// this much.
size_t max_size = size;
// TODO(vtl): Iterate through dispatchers and query their maximum size (and
// add each, rounded up, to |max_size|).

secondary_buffer_ = base::AlignedAlloc(max_size, kMessageAlignment);
// TODO(vtl): I wonder if it's faster to clear everything once, or to only
// clear padding as needed.
memset(secondary_buffer_, 0, max_size);

uint32_t* entry_size_table = static_cast<uint32_t*>(secondary_buffer_);
for (size_t i = 0; i < dispatchers_->size(); i++) {
// The entry size table entry is already zero by default.
if (!(*dispatchers_)[i])
continue;

// TODO(vtl): Serialize this dispatcher (getting its actual size, and adding
// that (rounded up) to |size|.
entry_size_table[i] = 0;

DCHECK((*dispatchers_)[i]->HasOneRef());
(*dispatchers_)[i]->Close();
}

secondary_buffer_size_ = static_cast<uint32_t>(size);
UpdateTotalSize();
}

void MessageInTransit::UpdateTotalSize() {
DCHECK_EQ(main_buffer_size_ % kMessageAlignment, 0u);
DCHECK_EQ(secondary_buffer_size_ % kMessageAlignment, 0u);
header()->total_size =
static_cast<uint32_t>(main_buffer_size_ + secondary_buffer_size_);
}

} // namespace system
} // namespace mojo
80 changes: 53 additions & 27 deletions mojo/system/message_in_transit.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
namespace mojo {
namespace system {

class Channel;

// This class is used to represent data in transit. It is thread-unsafe.
class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
public:
Expand Down Expand Up @@ -87,40 +89,56 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
// Makes this message "own" the given set of dispatchers. The dispatchers must
// not be referenced from anywhere else (in particular, not from the handle
// table), i.e., each dispatcher must have a reference count of 1. This
// message must also own its main buffer and not already have dispatchers.
// message must also own its buffers and not already have dispatchers.
void SetDispatchers(
scoped_ptr<std::vector<scoped_refptr<Dispatcher> > > dispatchers);

// Gets the "main" buffer for a |MessageInTransit|. A |MessageInTransit| can
// be serialized by writing the main buffer. The returned pointer will be
// aligned to a multiple of |kMessageAlignment| bytes, and the size of the
// buffer (see below) will also be a multiple of |kMessageAlignment|.
// Serializes any dispatchers to the secondary buffer. This message must own
// its buffers (in order to have dispatchers in the first place), and the
// secondary buffer must not yet exist (so this must only be called once). The
// caller must ensure (e.g., by holding on to a reference) that |channel|
// stays alive through the call.
void SerializeAndCloseDispatchers(Channel* channel);

// |MessageInTransit| buffers: A |MessageInTransit| can be serialized by
// writing the main buffer and then, if it has one, the secondary buffer. Both
// buffers are |kMessageAlignment|-byte aligned and a multiple of
// |kMessageAlignment| bytes in size.
//
// The main buffer consists of the header (of type |Header|, which is an
// internal detail of this class) followed immediately by the message data
// (accessed by |bytes()| and of size |num_bytes()|, and also
// |kMessageAlignment|-byte aligned), and then any padding needed to make the
// main buffer a multiple of |kMessageAlignment| bytes in size.
//
// The main buffer always consists of the header (of type |Header|, which is
// an internal detail), followed by the message data, accessed by |bytes()|
// (of size |num_bytes()|, and also |kMessageAlignment|-aligned), and then any
// necessary padding to make |main_buffer_size()| a multiple of
// |kMessageAlignment|.
// TODO(vtl): Add a "secondary" buffer, so that this makes more sense.
// The secondary buffer consists first of a table of |uint32_t|s with
// |num_handles()| entries; each entry in the table is the (unpadded) size of
// the data for a handle (a size of 0 indicates in invalid handle/null
// dispatcher). The table is followed by padding, then the first entry and
// padding, the second entry and padding, etc. (padding as required to
// maintain |kMessageAlignment|-byte alignment).

// Gets the main buffer and its size (in number of bytes), respectively.
const void* main_buffer() const { return main_buffer_; }

// Gets the size of the main buffer (in number of bytes).
size_t main_buffer_size() const { return main_buffer_size_; }

// Gets the secondary buffer and its size (in number of bytes), respectively.
const void* secondary_buffer() const { return secondary_buffer_; }
size_t secondary_buffer_size() const { return secondary_buffer_size_; }

// Gets the total size of the message (see comment in |Header|, below).
size_t total_size() const { return header()->total_size; }

// Gets the size of the message data.
uint32_t num_bytes() const {
return header()->num_bytes;
}
uint32_t num_bytes() const { return header()->num_bytes; }

// Gets the message data (of size |num_bytes()| bytes).
const void* bytes() const {
return static_cast<const char*>(main_buffer_) + sizeof(Header);
}
void* bytes() { return static_cast<char*>(main_buffer_) + sizeof(Header); }

uint32_t num_handles() const {
return header()->num_handles;
}
uint32_t num_handles() const { return header()->num_handles; }

Type type() const { return header()->type; }
Subtype subtype() const { return header()->subtype; }
Expand All @@ -139,8 +157,6 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
return dispatchers_.get();
}

// TODO(vtl): Add whatever's necessary to transport handles.

// Rounds |n| up to a multiple of |kMessageAlignment|.
static inline size_t RoundUpMessageAlignment(size_t n) {
return (n + kMessageAlignment - 1) & ~(kMessageAlignment - 1);
Expand All @@ -155,12 +171,14 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
struct Header {
// Total size of the message, including the header, the message data
// ("bytes") including padding (to make it a multiple of |kMessageAlignment|
// bytes), and serialized handle information (TODO(vtl)).
// bytes), and serialized handle information. Note that this may not be the
// correct value if dispatchers are attached but
// |SerializeAndCloseDispatchers()| has not been called.
uint32_t total_size;
Type type;
Subtype subtype;
EndpointId source_id;
EndpointId destination_id;
Type type; // 2 bytes.
Subtype subtype; // 2 bytes.
EndpointId source_id; // 4 bytes.
EndpointId destination_id; // 4 bytes.
// Size of actual message data.
uint32_t num_bytes;
// Number of handles "attached".
Expand All @@ -175,10 +193,18 @@ class MOJO_SYSTEM_IMPL_EXPORT MessageInTransit {
}
Header* header() { return static_cast<Header*>(main_buffer_); }

bool owns_main_buffer_;
void UpdateTotalSize();

// Whether we own |main_buffer_| and |secondary_buffer_| or not (that is, we
// own neither).
bool owns_buffers_;

size_t main_buffer_size_;
void* main_buffer_;

size_t secondary_buffer_size_;
void* secondary_buffer_; // May be null.

// Any dispatchers that may be attached to this message. This is only
// supported if this message owns its main buffer. These dispatchers should be
// "owned" by this message, i.e., have a ref count of exactly 1. (We allow a
Expand Down
5 changes: 2 additions & 3 deletions mojo/system/proxy_message_pipe_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,10 @@ void ProxyMessagePipeEndpoint::EnqueueMessage(
}

if (is_running()) {
message->SerializeAndCloseDispatchers(channel_.get());

message->set_source_id(local_id_);
message->set_destination_id(remote_id_);
// If it fails at this point, the message gets dropped. (This is no
// different from any other in-transit errors.)
// Note: |WriteMessage()| will destroy the message even on failure.
if (!channel_->WriteMessage(message.Pass()))
LOG(WARNING) << "Failed to write message to channel";
} else {
Expand Down
Loading

0 comments on commit 71a7ad9

Please sign in to comment.