Skip to content

Add RcppThread::Rcerr #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
Provides R-friendly threading functionality:

* thread safe versions of [Rcpp's](http://www.rcpp.org/)
`checkUserInterrupt()` and `Rcout`,
`checkUserInterrupt()`, `Rcout`, and `Rcerr`,
* an interruptible thread class that otherwise behaves like
[`std::thread`](http://en.cppreference.com/w/cpp/thread/thread),
* classes for the [thread pool
Expand All @@ -29,6 +29,8 @@ or the [API documentation](https://tnagler.github.io/RcppThread/).

Since then, the following **new features** have been added:

- Printing to the error stream with `Rcerr`.

- Free-standing functions like `parallelFor()` now dispatch
to a global thread pool that persists for the entire session. This
significantly speeds up programs that repeatedly call these functions.
Expand Down Expand Up @@ -99,13 +101,14 @@ before including any headers in your source code.
1. Add the line `CXX_STD = CXX11` to the `src/Makevars(.win)` files of your package.
2. Add `RcppThread` to the `LinkingTo` field of your `DESCRIPTION` file.

## Automatic override of `std::cout` and `std::thread`
## Automatic override of `std::cout`, `std::cerr``, and `std::thread`

There are preprocessor options to replace all occurrences of `std::cout` and
`std::thread` with calls to `RcppThread::Rcout` and `RcppThread::Thread`
There are preprocessor options to replace all occurrences of `std::cout`, `std::cerr`, and `std::thread` with calls to `RcppThread::Rcout`, `RcppThread::Rcerr`, and `RcppThread::Thread`
(provided that the RcppThread headers are included first). To enable this, use

```
#define RCPPTHREAD_OVERRIDE_COUT 1 // std::cout override
#define RCPPTHREAD_OVERRIDE_CERR 1 // std::cerr override
#define RCPPTHREAD_OVERRIDE_THREAD 1 // std::thread override
```
before including the RcppThread headers.
Expand Down
1 change: 1 addition & 0 deletions inst/include/RcppThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "RcppThread/RMonitor.hpp"
#include "RcppThread/Rcout.hpp"
#include "RcppThread/Rcerr.hpp"
#include "RcppThread/Thread.hpp"
#include "RcppThread/ThreadPool.hpp"
#include "RcppThread/parallelFor.hpp"
Expand Down
19 changes: 19 additions & 0 deletions inst/include/RcppThread/RMonitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class RMonitor {
// user-facing functionality must be friends, so they can access
// protected members of RMonitor.
friend class RPrinter;
friend class RErrPrinter;
friend void checkUserInterrupt(bool condition);
friend bool isInterrupted(bool condition);

Expand Down Expand Up @@ -100,6 +101,23 @@ class RMonitor {
}
}

//! prints `object` to R error stream íf called from main thread; otherwise
//! adds a printable version of `object` to a buffer for deferred printing.
//! @param object a string or coercible object to print.
template<class T>
void safelyPrintErr(const T& object)
{
std::lock_guard<std::mutex> lk(m_);
msgsErr_ << object;
if ( calledFromMainThread() && (msgsErr_.str() != std::string("")) ) {
// release messages in buffer
REprintf("%s", msgsErr_.str().c_str());
//R_FlushConsole();
// clear message buffer
msgsErr_.str("");
}
}

private:
//! Ctors declared private, to instantiate class use `::instance()`.
RMonitor(void) : isInterrupted_(false) {}
Expand All @@ -119,6 +137,7 @@ class RMonitor {

std::mutex m_; // mutex for synchronized r/w
std::stringstream msgs_; // string buffer
std::stringstream msgsErr_; // string buffer for stderr
std::atomic_bool isInterrupted_;
};

Expand Down
66 changes: 66 additions & 0 deletions inst/include/RcppThread/Rcerr.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright © 2021 Thomas Nagler
//
// This file is part of the RcppThread and licensed under the terms of
// the MIT license. For a copy, see the LICENSE.md file in the root directory of
// RcppThread or https://github.com/tnagler/RcppThread/blob/master/LICENSE.md.

#pragma once

#include <ostream>
#include "RcppThread/RMonitor.hpp"

namespace RcppThread {

//! Safely printing to the R console from threaded code.
class RErrPrinter {
public:

//! prints `object` to R error stream íf called from main thread; otherwise
//! adds a printable version of `object` to a buffer for deferred printing.
//! @param object a string (or coercible object) to print.
//! @details Declared as a friend in `RMonitor`.
template<class T>
RErrPrinter& operator<< (T& object)
{
RMonitor::instance().safelyPrintErr(object);
return *this;
}

//! prints `object` to R error stream íf called from main thread; otherwise
//! adds a printable version of `object` to a buffer for deferred printing.
//! @param object a string (or coercible object) to print.
//! @details Declared as a friend in `RMonitor`.
template<class T>
RErrPrinter& operator<< (const T& object)
{
RMonitor::instance().safelyPrintErr(object);
return *this;
}

//! prints `object` to R error stream íf called from main thread; otherwise
//! adds a printable version of `object` to a buffer for deferred printing.
//! @param object a string (or coercible object) to print.
//! @details Declared as a friend in `RMonitor`.
RErrPrinter& operator<< (std::ostream& (*object)(std::ostream&))
{
RMonitor::instance().safelyPrintErr(object);
return *this;
}
};

//! global `RPrinter` instance called 'Rcerr' (as in Rcpp).
static RErrPrinter Rcerr = RErrPrinter();

}

// override std::cout to use RcppThread::Rcout instead
#ifndef RCPPTHREAD_OVERRIDE_CERR
#define RCPPTHREAD_OVERRIDE_CERR 0
#endif

#if RCPPTHREAD_OVERRIDE_CERR
#define cerr RcppThreadRcerr
namespace std {
static RcppThread::RErrPrinter RcppThreadRcerr = RcppThread::RErrPrinter();
}
#endif
3 changes: 3 additions & 0 deletions inst/include/RcppThread/Thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "RcppThread/RMonitor.hpp"
#include "RcppThread/Rcout.hpp"
#include "RcppThread/Rcerr.hpp"

#include <thread>
#include <future>
Expand Down Expand Up @@ -81,13 +82,15 @@ class Thread {
auto timeout = std::chrono::milliseconds(250);
while (future_.wait_for(timeout) != std::future_status::ready) {
Rcout << "";
Rcerr << "";
if (isInterrupted())
break;
std::this_thread::yield();
}
if (thread_.joinable())
thread_.join();
Rcout << "";
Rcerr << "";
checkUserInterrupt();
}

Expand Down
3 changes: 3 additions & 0 deletions inst/include/RcppThread/ThreadPool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "RcppThread/RMonitor.hpp"
#include "RcppThread/Rcout.hpp"
#include "RcppThread/Rcerr.hpp"
#include "RcppThread/quickpool.hpp"

#include <atomic>
Expand Down Expand Up @@ -232,10 +233,12 @@ ThreadPool::wait()
do {
pool_->wait(100);
Rcout << "";
Rcerr << "";
checkUserInterrupt();

} while (!pool_->done());
Rcout << "";
Rcerr << "";
}

//! waits for all jobs to finish.
Expand Down