Skip to content

Commit

Permalink
Call SendMsgParamsCallback::wroteBytes on successful writes
Browse files Browse the repository at this point in the history
Summary:
Rationale from the docblock:

> Required to enable "exactly once" transmission of ancillary data corresponding to `writeTag`.  For example, `AsyncFdSocket` ought not transmit tag-associated FDs twice.

Reviewed By: AkramaMirza

Differential Revision: D44348249

fbshipit-source-id: 1c33ec46b2f13c3862e3c7ac27299de364641705
  • Loading branch information
snarkmaster authored and facebook-github-bot committed Apr 1, 2023
1 parent c8e0303 commit 4076522
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 16 deletions.
32 changes: 18 additions & 14 deletions folly/io/async/AsyncSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3574,21 +3574,25 @@ AsyncSocket::WriteResult AsyncSocket::sendSocketMessage(
writeResult = sendSocketMessage(fd_, &msg, msg_flags);
}

if (writeResult.writeReturn > 0 && byteEventsEnabled &&
isSet(flagsL, WriteFlags::TIMESTAMP_WRITE)) {
CHECK_GT(getRawBytesWritten(), prewriteRawBytesWritten); // sanity check
ByteEvent byteEvent = {};
byteEvent.type = ByteEvent::Type::WRITE;
byteEvent.offset = getRawBytesWritten() - 1;
byteEvent.maybeRawBytesWritten = writeResult.writeReturn;
byteEvent.maybeRawBytesTriedToWrite = 0;
for (size_t i = 0; i < countL; ++i) {
byteEvent.maybeRawBytesTriedToWrite.value() += vecL[i].iov_len;
if (writeResult.writeReturn > 0) {
if (msg.msg_controllen != 0) {
sendMsgParamCallback_->wroteBytes(writeTag);
}
byteEvent.maybeWriteFlags = flagsL;
for (const auto& observer : lifecycleObservers_) {
if (observer->getConfig().byteEvents) {
observer->byteEvent(this, byteEvent);
if (byteEventsEnabled && isSet(flagsL, WriteFlags::TIMESTAMP_WRITE)) {
CHECK_GT(getRawBytesWritten(), prewriteRawBytesWritten); // sanity check
ByteEvent byteEvent = {};
byteEvent.type = ByteEvent::Type::WRITE;
byteEvent.offset = getRawBytesWritten() - 1;
byteEvent.maybeRawBytesWritten = writeResult.writeReturn;
byteEvent.maybeRawBytesTriedToWrite = 0;
for (size_t i = 0; i < countL; ++i) {
byteEvent.maybeRawBytesTriedToWrite.value() += vecL[i].iov_len;
}
byteEvent.maybeWriteFlags = flagsL;
for (const auto& observer : lifecycleObservers_) {
if (observer->getConfig().byteEvents) {
observer->byteEvent(this, byteEvent);
}
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions folly/io/async/AsyncSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ class AsyncSocket : public AsyncSocketTransport {
* sockets.
*
* This opaque type acts as the key to match `writeChain` calls with
* `getAncillaryData()` calls. It wraps `IOBuf*`, and implements
* equality, hashing, and ostream writes for debugging.
* `getAncillaryData()` and corresponding `wroteBytes()` calls. It wraps
* `IOBuf*`, and implements equality, hashing, and ostream writes for
* debugging.
*
* Important usage notes:
* - Even though `WriteRequestTag` never dereferences the pointer, it is
Expand Down Expand Up @@ -306,6 +307,17 @@ class AsyncSocket : public AsyncSocketTransport {
const WriteRequestTag& writeTag,
const bool byteEventsEnabled = false) noexcept;

/**
* Called immediately after a `sendmsg` corresponding to the preceding
* `getAncillaryData()` successfully sends at least 1 byte.
*
* This is required to enable "exactly once" transmission of ancillary
* data corresponding to `writeTag`. For example, `AsyncFdSocket` ought
* not transmit tag-associated FDs twice. Per POSIX, ancillary data are
* transmitted together with the first data byte.
*/
virtual void wroteBytes(const WriteRequestTag&) noexcept {}

// This is not an OS limitation (see `/proc/sys/net/core/optmem_max` on
// Linux) but is done only because today's `AsyncSocket` implementation
// uses `alloca` to allocate the ancillary data buffer on the stack in
Expand Down
7 changes: 7 additions & 0 deletions folly/io/async/test/AsyncSocketTest.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,12 @@ class TestSendMsgParamsCallback
return dataSize_;
}

void wroteBytes(
const folly::AsyncSocket::WriteRequestTag& tag) noexcept override {
CHECK_EQ(tag, expectedTag_);
tagLastWritten_ = tag;
}

int flags_;
folly::WriteFlags writeFlags_;
uint32_t dataSize_;
Expand All @@ -507,6 +513,7 @@ class TestSendMsgParamsCallback
bool queriedData_;
folly::AsyncSocket::WriteRequestTag expectedTag_{
folly::AsyncSocket::WriteRequestTag::EmptyDummy()};
std::optional<folly::AsyncSocket::WriteRequestTag> tagLastWritten_;
};

class TestServer {
Expand Down
2 changes: 2 additions & 0 deletions folly/io/async/test/AsyncSocketTest2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8499,9 +8499,11 @@ TEST(AsyncSocketTest, SendMessageAncillaryData) {
auto ioBuf = folly::IOBuf::wrapBuffer(&s_data, sizeof(s_data));
sendMsgCB.expectedTag_ = folly::AsyncSocket::WriteRequestTag{
ioBuf.get()}; // Also test write tagging.
ASSERT_FALSE(sendMsgCB.tagLastWritten_.has_value());
socket->writeChain(&wcb, std::move(ioBuf));
ASSERT_EQ(wcb.state, STATE_SUCCEEDED);
ASSERT_TRUE(sendMsgCB.queriedData_); // Did the tag check run?
ASSERT_EQ(sendMsgCB.expectedTag_, *sendMsgCB.tagLastWritten_);

// Receive the message
union {
Expand Down

0 comments on commit 4076522

Please sign in to comment.