Skip to content

[lldb] Assorted improvements to the Pipe class #128719

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions lldb/include/lldb/Host/PipeBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
#ifndef LLDB_HOST_PIPEBASE_H
#define LLDB_HOST_PIPEBASE_H

#include <chrono>
#include <string>

#include "lldb/Utility/Status.h"
#include "lldb/Utility/Timeout.h"
#include "llvm/ADT/SmallVector.h"
#include "llvm/ADT/StringRef.h"
#include "llvm/Support/Error.h"

namespace lldb_private {
class PipeBase {
Expand All @@ -32,10 +31,9 @@ class PipeBase {
virtual Status OpenAsReader(llvm::StringRef name,
bool child_process_inherit) = 0;

Status OpenAsWriter(llvm::StringRef name, bool child_process_inherit);
virtual Status
OpenAsWriterWithTimeout(llvm::StringRef name, bool child_process_inherit,
const std::chrono::microseconds &timeout) = 0;
virtual llvm::Error OpenAsWriter(llvm::StringRef name,
bool child_process_inherit,
const Timeout<std::micro> &timeout) = 0;

virtual bool CanRead() const = 0;
virtual bool CanWrite() const = 0;
Expand All @@ -56,14 +54,13 @@ class PipeBase {
// Delete named pipe.
virtual Status Delete(llvm::StringRef name) = 0;

virtual Status WriteWithTimeout(const void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_written) = 0;
Status Write(const void *buf, size_t size, size_t &bytes_written);
virtual Status ReadWithTimeout(void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_read) = 0;
Status Read(void *buf, size_t size, size_t &bytes_read);
virtual llvm::Expected<size_t>
Write(const void *buf, size_t size,
const Timeout<std::micro> &timeout = std::nullopt) = 0;

virtual llvm::Expected<size_t>
Read(void *buf, size_t size,
const Timeout<std::micro> &timeout = std::nullopt) = 0;
};
}

Expand Down
19 changes: 10 additions & 9 deletions lldb/include/lldb/Host/posix/PipePosix.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#ifndef LLDB_HOST_POSIX_PIPEPOSIX_H
#define LLDB_HOST_POSIX_PIPEPOSIX_H

#include "lldb/Host/PipeBase.h"
#include <mutex>

Expand Down Expand Up @@ -38,9 +39,8 @@ class PipePosix : public PipeBase {
llvm::SmallVectorImpl<char> &name) override;
Status OpenAsReader(llvm::StringRef name,
bool child_process_inherit) override;
Status
OpenAsWriterWithTimeout(llvm::StringRef name, bool child_process_inherit,
const std::chrono::microseconds &timeout) override;
llvm::Error OpenAsWriter(llvm::StringRef name, bool child_process_inherit,
const Timeout<std::micro> &timeout) override;

bool CanRead() const override;
bool CanWrite() const override;
Expand All @@ -64,12 +64,13 @@ class PipePosix : public PipeBase {

Status Delete(llvm::StringRef name) override;

Status WriteWithTimeout(const void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_written) override;
Status ReadWithTimeout(void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_read) override;
llvm::Expected<size_t>
Write(const void *buf, size_t size,
const Timeout<std::micro> &timeout = std::nullopt) override;

llvm::Expected<size_t>
Read(void *buf, size_t size,
const Timeout<std::micro> &timeout = std::nullopt) override;

private:
bool CanReadUnlocked() const;
Expand Down
18 changes: 9 additions & 9 deletions lldb/include/lldb/Host/windows/PipeWindows.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class PipeWindows : public PipeBase {
llvm::SmallVectorImpl<char> &name) override;
Status OpenAsReader(llvm::StringRef name,
bool child_process_inherit) override;
Status
OpenAsWriterWithTimeout(llvm::StringRef name, bool child_process_inherit,
const std::chrono::microseconds &timeout) override;
llvm::Error OpenAsWriter(llvm::StringRef name, bool child_process_inherit,
const Timeout<std::micro> &timeout) override;

bool CanRead() const override;
bool CanWrite() const override;
Expand All @@ -59,12 +58,13 @@ class PipeWindows : public PipeBase {

Status Delete(llvm::StringRef name) override;

Status WriteWithTimeout(const void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_written) override;
Status ReadWithTimeout(void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_read) override;
llvm::Expected<size_t>
Write(const void *buf, size_t size,
const Timeout<std::micro> &timeout = std::nullopt) override;

llvm::Expected<size_t>
Read(void *buf, size_t size,
const Timeout<std::micro> &timeout = std::nullopt) override;

// PipeWindows specific methods. These allow access to the underlying OS
// handle.
Expand Down
16 changes: 0 additions & 16 deletions lldb/source/Host/common/PipeBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,3 @@
using namespace lldb_private;

PipeBase::~PipeBase() = default;

Status PipeBase::OpenAsWriter(llvm::StringRef name,
bool child_process_inherit) {
return OpenAsWriterWithTimeout(name, child_process_inherit,
std::chrono::microseconds::zero());
}

Status PipeBase::Write(const void *buf, size_t size, size_t &bytes_written) {
return WriteWithTimeout(buf, size, std::chrono::microseconds::zero(),
bytes_written);
}

Status PipeBase::Read(void *buf, size_t size, size_t &bytes_read) {
return ReadWithTimeout(buf, size, std::chrono::microseconds::zero(),
bytes_read);
}
31 changes: 14 additions & 17 deletions lldb/source/Host/common/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,14 @@ Status SharedSocket::CompleteSending(lldb::pid_t child_pid) {
"WSADuplicateSocket() failed, error: %d", last_error);
}

size_t num_bytes;
Status error =
m_socket_pipe.WriteWithTimeout(&protocol_info, sizeof(protocol_info),
std::chrono::seconds(10), num_bytes);
if (error.Fail())
return error;
if (num_bytes != sizeof(protocol_info))
llvm::Expected<size_t> num_bytes = m_socket_pipe.Write(
&protocol_info, sizeof(protocol_info), std::chrono::seconds(10));
if (!num_bytes)
return Status::FromError(num_bytes.takeError());
if (*num_bytes != sizeof(protocol_info))
return Status::FromErrorStringWithFormatv(
"WriteWithTimeout(WSAPROTOCOL_INFO) failed: {0} bytes", num_bytes);
"Write(WSAPROTOCOL_INFO) failed: wrote {0}/{1} bytes", *num_bytes,
sizeof(protocol_info));
#endif
return Status();
}
Expand All @@ -113,16 +112,14 @@ Status SharedSocket::GetNativeSocket(shared_fd_t fd, NativeSocket &socket) {
WSAPROTOCOL_INFO protocol_info;
{
Pipe socket_pipe(fd, LLDB_INVALID_PIPE);
size_t num_bytes;
Status error =
socket_pipe.ReadWithTimeout(&protocol_info, sizeof(protocol_info),
std::chrono::seconds(10), num_bytes);
if (error.Fail())
return error;
if (num_bytes != sizeof(protocol_info)) {
llvm::Expected<size_t> num_bytes = socket_pipe.Read(
&protocol_info, sizeof(protocol_info), std::chrono::seconds(10));
if (!num_bytes)
return Status::FromError(num_bytes.takeError());
if (*num_bytes != sizeof(protocol_info)) {
return Status::FromErrorStringWithFormatv(
"socket_pipe.ReadWithTimeout(WSAPROTOCOL_INFO) failed: {0} bytes",
num_bytes);
"Read(WSAPROTOCOL_INFO) failed: read {0}/{1} bytes", *num_bytes,
sizeof(protocol_info));
}
}
socket = ::WSASocket(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO,
Expand Down
16 changes: 6 additions & 10 deletions lldb/source/Host/posix/ConnectionFileDescriptorPosix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,7 @@ ConnectionFileDescriptor::Connect(llvm::StringRef path,
}

bool ConnectionFileDescriptor::InterruptRead() {
size_t bytes_written = 0;
Status result = m_pipe.Write("i", 1, bytes_written);
return result.Success();
return !errorToBool(m_pipe.Write("i", 1).takeError());
}

ConnectionStatus ConnectionFileDescriptor::Disconnect(Status *error_ptr) {
Expand All @@ -205,13 +203,11 @@ ConnectionStatus ConnectionFileDescriptor::Disconnect(Status *error_ptr) {
std::unique_lock<std::recursive_mutex> locker(m_mutex, std::defer_lock);
if (!locker.try_lock()) {
if (m_pipe.CanWrite()) {
size_t bytes_written = 0;
Status result = m_pipe.Write("q", 1, bytes_written);
LLDB_LOGF(log,
"%p ConnectionFileDescriptor::Disconnect(): Couldn't get "
"the lock, sent 'q' to %d, error = '%s'.",
static_cast<void *>(this), m_pipe.GetWriteFileDescriptor(),
result.AsCString());
llvm::Error err = m_pipe.Write("q", 1).takeError();
LLDB_LOG(log,
"{0}: Couldn't get the lock, sent 'q' to {1}, error = '{2}'.",
this, m_pipe.GetWriteFileDescriptor(), err);
consumeError(std::move(err));
} else if (log) {
LLDB_LOGF(log,
"%p ConnectionFileDescriptor::Disconnect(): Couldn't get the "
Expand Down
6 changes: 1 addition & 5 deletions lldb/source/Host/posix/MainLoopPosix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,5 @@ void MainLoopPosix::Interrupt() {
return;

char c = '.';
size_t bytes_written;
Status error = m_interrupt_pipe.Write(&c, 1, bytes_written);
assert(error.Success());
UNUSED_IF_ASSERT_DISABLED(error);
assert(bytes_written == 1);
cantFail(m_interrupt_pipe.Write(&c, 1));
}
107 changes: 46 additions & 61 deletions lldb/source/Host/posix/PipePosix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
#include "lldb/Utility/SelectHelper.h"
#include "llvm/ADT/SmallString.h"
#include "llvm/Support/Errno.h"
#include "llvm/Support/Error.h"
#include <functional>
#include <system_error>
#include <thread>

#include <cerrno>
Expand Down Expand Up @@ -164,26 +166,27 @@ Status PipePosix::OpenAsReader(llvm::StringRef name,
return error;
}

Status
PipePosix::OpenAsWriterWithTimeout(llvm::StringRef name,
bool child_process_inherit,
const std::chrono::microseconds &timeout) {
llvm::Error PipePosix::OpenAsWriter(llvm::StringRef name,
bool child_process_inherit,
const Timeout<std::micro> &timeout) {
std::lock_guard<std::mutex> guard(m_write_mutex);
if (CanReadUnlocked() || CanWriteUnlocked())
return Status::FromErrorString("Pipe is already opened");
return llvm::createStringError("Pipe is already opened");

int flags = O_WRONLY | O_NONBLOCK;
if (!child_process_inherit)
flags |= O_CLOEXEC;

using namespace std::chrono;
const auto finish_time = Now() + timeout;
std::optional<time_point<steady_clock>> finish_time;
if (timeout)
finish_time = Now() + *timeout;

while (!CanWriteUnlocked()) {
if (timeout != microseconds::zero()) {
const auto dur = duration_cast<microseconds>(finish_time - Now()).count();
if (dur <= 0)
return Status::FromErrorString(
if (timeout) {
if (Now() > finish_time)
return llvm::createStringError(
std::make_error_code(std::errc::timed_out),
"timeout exceeded - reader hasn't opened so far");
}

Expand All @@ -193,7 +196,8 @@ PipePosix::OpenAsWriterWithTimeout(llvm::StringRef name,
const auto errno_copy = errno;
// We may get ENXIO if a reader side of the pipe hasn't opened yet.
if (errno_copy != ENXIO && errno_copy != EINTR)
return Status(errno_copy, eErrorTypePOSIX);
return llvm::errorCodeToError(
std::error_code(errno_copy, std::generic_category()));

std::this_thread::sleep_for(
milliseconds(OPEN_WRITER_SLEEP_TIMEOUT_MSECS));
Expand All @@ -202,7 +206,7 @@ PipePosix::OpenAsWriterWithTimeout(llvm::StringRef name,
}
}

return Status();
return llvm::Error::success();
}

int PipePosix::GetReadFileDescriptor() const {
Expand Down Expand Up @@ -300,70 +304,51 @@ void PipePosix::CloseWriteFileDescriptorUnlocked() {
}
}

Status PipePosix::ReadWithTimeout(void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_read) {
llvm::Expected<size_t> PipePosix::Read(void *buf, size_t size,
const Timeout<std::micro> &timeout) {
std::lock_guard<std::mutex> guard(m_read_mutex);
bytes_read = 0;
if (!CanReadUnlocked())
return Status(EINVAL, eErrorTypePOSIX);
return llvm::errorCodeToError(
std::make_error_code(std::errc::invalid_argument));

const int fd = GetReadFileDescriptorUnlocked();

SelectHelper select_helper;
select_helper.SetTimeout(timeout);
if (timeout)
select_helper.SetTimeout(*timeout);
select_helper.FDSetRead(fd);

Status error;
while (error.Success()) {
error = select_helper.Select();
if (error.Success()) {
auto result =
::read(fd, static_cast<char *>(buf) + bytes_read, size - bytes_read);
if (result != -1) {
bytes_read += result;
if (bytes_read == size || result == 0)
break;
} else if (errno == EINTR) {
continue;
} else {
error = Status::FromErrno();
break;
}
}
}
return error;
if (llvm::Error error = select_helper.Select().takeError())
return error;

ssize_t result = ::read(fd, buf, size);
if (result == -1)
return llvm::errorCodeToError(
std::error_code(errno, std::generic_category()));
Comment on lines +325 to +327
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for EINTR? (or RetryAfterSignal)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe that necessary because EINTR is returned when a blocking syscall is interrupted, and the select call above basically guarantees that this will not block:

       If  a signal handler is invoked while a system call or library function call is blocked, then either:
       •  the call is automatically restarted after the signal handler returns; or
       •  the call fails with the error EINTR.
       select()  allows a program to monitor multiple file descriptors, waiting until one or more of the
       file descriptors become "ready" for some class of I/O operation (e.g., input possible).   A  file
       descriptor  is considered ready if it is possible to perform a corresponding I/O operation (e.g.,
       read(2), or a sufficiently small write(2)) without blocking.

This is based on the linux documentation, but I'd expect other systems to behave the same way. That said, the check doesn't hurt much, so I'd fine with adding it if you think it helps.


return result;
}

Status PipePosix::WriteWithTimeout(const void *buf, size_t size,
const std::chrono::microseconds &timeout,
size_t &bytes_written) {
llvm::Expected<size_t> PipePosix::Write(const void *buf, size_t size,
const Timeout<std::micro> &timeout) {
std::lock_guard<std::mutex> guard(m_write_mutex);
bytes_written = 0;
if (!CanWriteUnlocked())
return Status(EINVAL, eErrorTypePOSIX);
return llvm::errorCodeToError(
std::make_error_code(std::errc::invalid_argument));

const int fd = GetWriteFileDescriptorUnlocked();
SelectHelper select_helper;
select_helper.SetTimeout(timeout);
if (timeout)
select_helper.SetTimeout(*timeout);
select_helper.FDSetWrite(fd);

Status error;
while (error.Success()) {
error = select_helper.Select();
if (error.Success()) {
auto result = ::write(fd, static_cast<const char *>(buf) + bytes_written,
size - bytes_written);
if (result != -1) {
bytes_written += result;
if (bytes_written == size)
break;
} else if (errno == EINTR) {
continue;
} else {
error = Status::FromErrno();
}
}
}
return error;
if (llvm::Error error = select_helper.Select().takeError())
return error;

ssize_t result = ::write(fd, buf, size);
if (result == -1)
return llvm::errorCodeToError(
std::error_code(errno, std::generic_category()));
Comment on lines +349 to +351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for EINTR? (or RetryAfterSignal)?


return result;
}
Loading