Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed #9590 - ProcessExecutor: read the process pipe until we have all the expected data / greatly improved error handling #4859

Merged
merged 5 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/cppcheckexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ void CppCheckExecutor::reportErr(const std::string &errmsg)

void CppCheckExecutor::reportOut(const std::string &outmsg, Color c)
{
// TODO: do not unconditionally apply colors
std::cout << c << ansiToOEM(outmsg, true) << Color::Reset << std::endl;
}

Expand Down
92 changes: 65 additions & 27 deletions cli/processexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class PipeWriter : public ErrorLogger {
explicit PipeWriter(int pipe) : mWpipe(pipe) {}

void reportOut(const std::string &outmsg, Color c) override {
// TODO: do not unconditionally apply colors
writeToPipe(REPORT_OUT, ::toString(c) + outmsg + ::toString(Color::Reset));
}

Expand All @@ -85,17 +86,29 @@ class PipeWriter : public ErrorLogger {
}

private:
// TODO: how to log file name in error?
void writeToPipe(PipeSignal type, const std::string &data) const
{
unsigned int len = static_cast<unsigned int>(data.length() + 1);
char *out = new char[len + 1 + sizeof(len)];
out[0] = static_cast<char>(type);
std::memcpy(&(out[1]), &len, sizeof(len));
std::memcpy(&(out[1+sizeof(len)]), data.c_str(), len);
if (write(mWpipe, out, len + 1 + sizeof(len)) <= 0) {

std::size_t bytes_to_write = len + 1 + sizeof(len);
ssize_t bytes_written = write(mWpipe, out, len + 1 + sizeof(len));
if (bytes_written <= 0) {
const int err = errno;
delete[] out;
out = nullptr;
std::cerr << "#### ThreadExecutor::writeToPipe() error for type " << type << ": " << std::strerror(err) << std::endl;
std::exit(EXIT_FAILURE);
}
// TODO: write until everything is written
if (bytes_written != bytes_to_write) {
delete[] out;
out = nullptr;
std::cerr << "#### ThreadExecutor::writeToPipe, Failed to write to pipe" << std::endl;
std::cerr << "#### ThreadExecutor::writeToPipe() error for type " << type << ": insufficient data written (expected: " << bytes_to_write << " / got: " << bytes_written << ")" << std::endl;
std::exit(EXIT_FAILURE);
}

Expand All @@ -105,63 +118,85 @@ class PipeWriter : public ErrorLogger {
const int mWpipe;
};

int ProcessExecutor::handleRead(int rpipe, unsigned int &result)
bool ProcessExecutor::handleRead(int rpipe, unsigned int &result, const std::string& filename)
{
std::size_t bytes_to_read;
ssize_t bytes_read;

char type = 0;
if (read(rpipe, &type, 1) <= 0) {
bytes_to_read = sizeof(char);
bytes_read = read(rpipe, &type, bytes_to_read);
if (bytes_read <= 0) {
if (errno == EAGAIN)
return 0;
return true;

// TODO: log details about failure

// need to increment so a missing pipe (i.e. premature exit of forked process) results in an error exitcode
++result;
return -1;
return false;
}
if (bytes_read != bytes_to_read) {
std::cerr << "#### ThreadExecutor::handleRead(" << filename << ") error (type): insufficient data read (expected: " << bytes_to_read << " / got: " << bytes_read << ")" << std::endl;
std::exit(EXIT_FAILURE);
}

if (type != PipeWriter::REPORT_OUT && type != PipeWriter::REPORT_ERROR && type != PipeWriter::CHILD_END) {
std::cerr << "#### ThreadExecutor::handleRead error, type was:" << type << std::endl;
std::cerr << "#### ThreadExecutor::handleRead(" << filename << ") invalid type " << int(type) << std::endl;
std::exit(EXIT_FAILURE);
}

unsigned int len = 0;
if (read(rpipe, &len, sizeof(len)) <= 0) {
std::cerr << "#### ThreadExecutor::handleRead error, type was:" << type << std::endl;
bytes_to_read = sizeof(len);
bytes_read = read(rpipe, &len, bytes_to_read);
if (bytes_read <= 0) {
const int err = errno;
std::cerr << "#### ThreadExecutor::handleRead(" << filename << ") error (len) for type " << int(type) << ": " << std::strerror(err) << std::endl;
std::exit(EXIT_FAILURE);
}
if (bytes_read != bytes_to_read) {
std::cerr << "#### ThreadExecutor::handleRead(" << filename << ") error (len) for type" << int(type) << ": insufficient data read (expected: " << bytes_to_read << " / got: " << bytes_read << ")" << std::endl;
std::exit(EXIT_FAILURE);
}

// Don't rely on incoming data being null-terminated.
// Allocate +1 element and null-terminate the buffer.
char *buf = new char[len + 1];
const ssize_t readIntoBuf = read(rpipe, buf, len);
if (readIntoBuf <= 0) {
std::cerr << "#### ThreadExecutor::handleRead error, type was:" << type << std::endl;
std::exit(EXIT_FAILURE);
}
buf[readIntoBuf] = 0;
char *data_start = buf;
bytes_to_read = len;
do {
bytes_read = read(rpipe, data_start, bytes_to_read);
if (bytes_read <= 0) {
const int err = errno;
std::cerr << "#### ThreadExecutor::handleRead(" << filename << ") error (buf) for type" << int(type) << ": " << std::strerror(err) << std::endl;
std::exit(EXIT_FAILURE);
}
bytes_to_read -= bytes_read;
data_start += bytes_read;
} while (bytes_to_read != 0);
buf[len] = 0;

bool res = true;
if (type == PipeWriter::REPORT_OUT) {
mErrorLogger.reportOut(buf);
} else if (type == PipeWriter::REPORT_ERROR) {
ErrorMessage msg;
try {
msg.deserialize(buf);
} catch (const InternalError& e) {
std::cerr << "#### ThreadExecutor::handleRead error, internal error:" << e.errorMessage << std::endl;
std::cerr << "#### ThreadExecutor::handleRead(" << filename << ") internal error: " << e.errorMessage << std::endl;
std::exit(EXIT_FAILURE);
}

if (hasToLog(msg))
mErrorLogger.reportErr(msg);
} else if (type == PipeWriter::CHILD_END) {
std::istringstream iss(buf);
unsigned int fileResult = 0;
iss >> fileResult;
result += fileResult;
delete[] buf;
return -1;
result += std::stoi(buf);
res = false;
}

delete[] buf;
return 1;
return res;
}

bool ProcessExecutor::checkLoadAverage(size_t nchildren)
Expand Down Expand Up @@ -274,12 +309,15 @@ unsigned int ProcessExecutor::check()
std::list<int>::iterator rp = rpipes.begin();
while (rp != rpipes.end()) {
if (FD_ISSET(*rp, &rfds)) {
const int readRes = handleRead(*rp, result);
if (readRes == -1) {
std::string name;
const std::map<int, std::string>::iterator p = pipeFile.find(*rp);
if (p != pipeFile.end()) {
name = p->second;
}
const bool readRes = handleRead(*rp, result, name);
if (!readRes) {
std::size_t size = 0;
const std::map<int, std::string>::iterator p = pipeFile.find(*rp);
if (p != pipeFile.end()) {
std::string name = p->second;
pipeFile.erase(p);
const std::map<std::string, std::size_t>::const_iterator fs = mFiles.find(name);
if (fs != mFiles.end()) {
Expand Down
6 changes: 2 additions & 4 deletions cli/processexecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ class ProcessExecutor : public Executor {
private:
/**
* Read from the pipe, parse and handle what ever is in there.
*@return -1 in case of error
* 0 if there is nothing in the pipe to be read
* 1 if we did read something
* @return False in case of an recoverable error - will exit process on others
*/
int handleRead(int rpipe, unsigned int &result);
bool handleRead(int rpipe, unsigned int &result, const std::string& filename);

/**
* @brief Check load average condition
Expand Down