Skip to content

Commit 94e45fa

Browse files
authored
ARROW-17859: [C++] Use self-pipe in signal-receiving StopSource (#14250)
Authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent ada7e23 commit 94e45fa

File tree

5 files changed

+321
-49
lines changed

5 files changed

+321
-49
lines changed

cpp/src/arrow/util/cancel.cc

Lines changed: 147 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
#include <atomic>
2121
#include <mutex>
2222
#include <sstream>
23+
#include <thread>
2324
#include <utility>
2425

2526
#include "arrow/result.h"
27+
#include "arrow/util/atfork_internal.h"
2628
#include "arrow/util/io_util.h"
2729
#include "arrow/util/logging.h"
30+
#include "arrow/util/mutex.h"
2831
#include "arrow/util/visibility.h"
2932

3033
namespace arrow {
@@ -33,7 +36,9 @@ namespace arrow {
3336
#error Lock-free atomic int required for signal safety
3437
#endif
3538

39+
using internal::AtForkHandler;
3640
using internal::ReinstateSignalHandler;
41+
using internal::SelfPipe;
3742
using internal::SetSignalHandler;
3843
using internal::SignalHandler;
3944

@@ -99,16 +104,58 @@ Status StopToken::Poll() const {
99104

100105
namespace {
101106

102-
struct SignalStopState {
107+
struct SignalStopState : public std::enable_shared_from_this<SignalStopState> {
103108
struct SavedSignalHandler {
104109
int signum;
105110
SignalHandler handler;
106111
};
107112

113+
// NOTE: shared_from_this() doesn't work from constructor
114+
void Init() {
115+
// XXX this pattern appears in several places, factor it out?
116+
atfork_handler_ = std::make_shared<AtForkHandler>(
117+
/*before=*/
118+
[weak_self = std::weak_ptr<SignalStopState>(shared_from_this())] {
119+
auto self = weak_self.lock();
120+
if (self) {
121+
self->BeforeFork();
122+
}
123+
return self;
124+
},
125+
/*parent_after=*/
126+
[](std::any token) {
127+
auto self = std::any_cast<std::shared_ptr<SignalStopState>>(std::move(token));
128+
self->ParentAfterFork();
129+
},
130+
/*child_after=*/
131+
[](std::any token) {
132+
auto self = std::any_cast<std::shared_ptr<SignalStopState>>(std::move(token));
133+
self->ChildAfterFork();
134+
});
135+
RegisterAtFork(atfork_handler_);
136+
}
137+
108138
Status RegisterHandlers(const std::vector<int>& signals) {
139+
std::lock_guard<std::mutex> lock(mutex_);
109140
if (!saved_handlers_.empty()) {
110141
return Status::Invalid("Signal handlers already registered");
111142
}
143+
if (!self_pipe_) {
144+
// Make sure the self-pipe is initialized
145+
// (NOTE: avoid std::atomic_is_lock_free() which may require libatomic)
146+
#if ATOMIC_POINTER_LOCK_FREE != 2
147+
return Status::NotImplemented(
148+
"Cannot setup signal StopSource because atomic pointers are not "
149+
"lock-free on this platform");
150+
#else
151+
ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
152+
#endif
153+
}
154+
if (!signal_receiving_thread_) {
155+
// Spawn thread for receiving signals
156+
SpawnSignalReceivingThread();
157+
}
158+
self_pipe_ptr_.store(self_pipe_.get());
112159
for (int signum : signals) {
113160
ARROW_ASSIGN_OR_RAISE(auto handler,
114161
SetSignalHandler(signum, SignalHandler{&HandleSignal}));
@@ -118,78 +165,135 @@ struct SignalStopState {
118165
}
119166

120167
void UnregisterHandlers() {
168+
std::lock_guard<std::mutex> lock(mutex_);
169+
self_pipe_ptr_.store(nullptr);
121170
auto handlers = std::move(saved_handlers_);
122171
for (const auto& h : handlers) {
123172
ARROW_CHECK_OK(SetSignalHandler(h.signum, h.handler).status());
124173
}
125174
}
126175

127176
~SignalStopState() {
177+
atfork_handler_.reset();
128178
UnregisterHandlers();
129179
Disable();
180+
if (signal_receiving_thread_) {
181+
// Tell the receiving thread to stop
182+
auto st = self_pipe_->Shutdown();
183+
ARROW_WARN_NOT_OK(st, "Failed to shutdown self-pipe");
184+
if (st.ok()) {
185+
signal_receiving_thread_->join();
186+
} else {
187+
signal_receiving_thread_->detach();
188+
}
189+
}
130190
}
131191

132-
StopSource* stop_source() { return stop_source_.get(); }
192+
StopSource* stop_source() {
193+
std::lock_guard<std::mutex> lock(mutex_);
194+
return stop_source_.get();
195+
}
133196

134-
bool enabled() { return stop_source_ != nullptr; }
197+
bool enabled() {
198+
std::lock_guard<std::mutex> lock(mutex_);
199+
return stop_source_ != nullptr;
200+
}
135201

136202
void Enable() {
137-
// Before creating a new StopSource, delete any lingering reference to
138-
// the previous one in the trash can. See DoHandleSignal() for details.
139-
EmptyTrashCan();
140-
std::atomic_store(&stop_source_, std::make_shared<StopSource>());
203+
std::lock_guard<std::mutex> lock(mutex_);
204+
stop_source_ = std::make_shared<StopSource>();
141205
}
142206

143-
void Disable() { std::atomic_store(&stop_source_, NullSource()); }
207+
void Disable() {
208+
std::lock_guard<std::mutex> lock(mutex_);
209+
stop_source_.reset();
210+
}
144211

145-
static SignalStopState* instance() { return &instance_; }
212+
static SignalStopState* instance() {
213+
static std::shared_ptr<SignalStopState> instance = []() {
214+
auto ptr = std::make_shared<SignalStopState>();
215+
ptr->Init();
216+
return ptr;
217+
}();
218+
return instance.get();
219+
}
146220

147221
private:
148-
// For readability
149-
std::shared_ptr<StopSource> NullSource() { return nullptr; }
150-
151-
void EmptyTrashCan() { std::atomic_store(&trash_can_, NullSource()); }
222+
void SpawnSignalReceivingThread() {
223+
signal_receiving_thread_ = std::make_unique<std::thread>(ReceiveSignals, self_pipe_);
224+
}
152225

153-
static void HandleSignal(int signum) { instance_.DoHandleSignal(signum); }
226+
static void HandleSignal(int signum) {
227+
auto self = instance();
228+
if (self) {
229+
self->DoHandleSignal(signum);
230+
}
231+
}
154232

155233
void DoHandleSignal(int signum) {
156234
// async-signal-safe code only
157-
auto source = std::atomic_load(&stop_source_);
158-
if (source) {
159-
source->RequestStopFromSignal(signum);
160-
// Disable() may have been called in the meantime, but we can't
161-
// deallocate a shared_ptr here, so instead move it to a "trash can".
162-
// This minimizes the possibility of running a deallocator here,
163-
// however it doesn't entirely preclude it.
164-
//
165-
// Possible case:
166-
// - a signal handler (A) starts running, fetches the current source
167-
// - Disable() then Enable() are called, emptying the trash can and
168-
// replacing the current source
169-
// - a signal handler (B) starts running, fetches the current source
170-
// - signal handler A resumes, moves its source (the old source) into
171-
// the trash can (the only remaining reference)
172-
// - signal handler B resumes, moves its source (the current source)
173-
// into the trash can. This triggers deallocation of the old source,
174-
// since the trash can had the only remaining reference to it.
175-
//
176-
// This case should be sufficiently unlikely, but we cannot entirely
177-
// rule it out. The problem might be solved properly with a lock-free
178-
// linked list of StopSources.
179-
std::atomic_store(&trash_can_, std::move(source));
235+
SelfPipe* self_pipe = self_pipe_ptr_.load();
236+
if (self_pipe) {
237+
self_pipe->Send(/*payload=*/signum);
180238
}
181239
ReinstateSignalHandler(signum, &HandleSignal);
182240
}
183241

184-
std::shared_ptr<StopSource> stop_source_;
185-
std::shared_ptr<StopSource> trash_can_;
242+
static void ReceiveSignals(std::shared_ptr<SelfPipe> self_pipe) {
243+
// Wait for signals on the self-pipe and propagate them to the current StopSource
244+
DCHECK(self_pipe);
245+
while (true) {
246+
auto maybe_payload = self_pipe->Wait();
247+
if (maybe_payload.status().IsInvalid()) {
248+
// Pipe shut down
249+
return;
250+
}
251+
if (!maybe_payload.ok()) {
252+
maybe_payload.status().Warn();
253+
return;
254+
}
255+
const int signum = static_cast<int>(maybe_payload.ValueUnsafe());
256+
instance()->ReceiveSignal(signum);
257+
}
258+
}
186259

187-
std::vector<SavedSignalHandler> saved_handlers_;
260+
void ReceiveSignal(int signum) {
261+
std::lock_guard<std::mutex> lock(mutex_);
262+
if (stop_source_) {
263+
stop_source_->RequestStopFromSignal(signum);
264+
}
265+
}
188266

189-
static SignalStopState instance_;
190-
};
267+
// At-fork handlers
268+
269+
void BeforeFork() { mutex_.lock(); }
270+
271+
void ParentAfterFork() { mutex_.unlock(); }
191272

192-
SignalStopState SignalStopState::instance_{};
273+
void ChildAfterFork() {
274+
new (&mutex_) std::mutex;
275+
// Leak previous thread, as it has become invalid.
276+
// We can't spawn a new one here as it would have unfortunate side effects;
277+
// especially in the frequent context of a fork+exec.
278+
// (for example the Python subprocess module closes all fds before calling exec)
279+
ARROW_UNUSED(signal_receiving_thread_.release());
280+
// Make internal state consistent: with no listening thread, we shouldn't
281+
// feed the self-pipe from the signal handler.
282+
UnregisterHandlers();
283+
}
284+
285+
std::mutex mutex_;
286+
std::vector<SavedSignalHandler> saved_handlers_;
287+
std::shared_ptr<StopSource> stop_source_;
288+
std::unique_ptr<std::thread> signal_receiving_thread_;
289+
std::shared_ptr<AtForkHandler> atfork_handler_;
290+
291+
// For signal handler interaction
292+
std::shared_ptr<SelfPipe> self_pipe_;
293+
// Raw atomic pointer, as atomic load/store of a shared_ptr may not be lock-free
294+
// (it is not on libstdc++).
295+
std::atomic<SelfPipe*> self_pipe_ptr_;
296+
};
193297

194298
} // namespace
195299

cpp/src/arrow/util/cancel.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class ARROW_EXPORT StopSource {
4242
// Consumer API (the side that stops)
4343
void RequestStop();
4444
void RequestStop(Status error);
45+
// Async-signal-safe. TODO Deprecate this?
4546
void RequestStopFromSignal(int signum);
4647

4748
StopToken token();
@@ -103,6 +104,10 @@ ARROW_EXPORT
103104
void ResetSignalStopSource();
104105

105106
/// EXPERIMENTAL: Register signal handler triggering the signal-receiving StopSource
107+
///
108+
/// Note that those handlers are automatically un-registered in a fork()ed process,
109+
/// therefore the child process will need to call RegisterCancellingSignalHandler()
110+
/// if desired.
106111
ARROW_EXPORT
107112
Status RegisterCancellingSignalHandler(const std::vector<int>& signals);
108113

cpp/src/arrow/util/cancel_test.cc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include <atomic>
1919
#include <cmath>
20+
#include <functional>
2021
#include <optional>
2122
#include <sstream>
2223
#include <string>
@@ -29,6 +30,8 @@
2930
#include <signal.h>
3031
#ifndef _WIN32
3132
#include <sys/time.h> // for setitimer()
33+
#include <sys/types.h>
34+
#include <unistd.h>
3235
#endif
3336

3437
#include "arrow/testing/gtest_util.h"
@@ -201,6 +204,23 @@ class SignalCancelTest : public CancelTest {
201204
ASSERT_EQ(internal::SignalFromStatus(st), expected_signal_);
202205
}
203206

207+
#ifndef _WIN32
208+
void RunInChild(std::function<void()> func) {
209+
auto child_pid = fork();
210+
if (child_pid == -1) {
211+
ASSERT_OK(internal::IOErrorFromErrno(errno, "Error calling fork(): "));
212+
}
213+
if (child_pid == 0) {
214+
// Child
215+
ASSERT_NO_FATAL_FAILURE(func()) << "Failure in child process";
216+
std::exit(0);
217+
} else {
218+
// Parent
219+
AssertChildExit(child_pid);
220+
}
221+
}
222+
#endif
223+
204224
protected:
205225
#ifdef _WIN32
206226
const int expected_signal_ = SIGINT;
@@ -238,6 +258,54 @@ TEST_F(SignalCancelTest, RegisterUnregister) {
238258
AssertStopRequested();
239259
}
240260

261+
#if !(defined(_WIN32) || defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) || \
262+
defined(THREAD_SANITIZER))
263+
TEST_F(SignalCancelTest, ForkSafetyUnregisteredHandlers) {
264+
RunInChild([&]() {
265+
// Child
266+
TriggerSignal();
267+
AssertStopNotRequested();
268+
269+
RegisterHandler();
270+
TriggerSignal();
271+
AssertStopRequested();
272+
});
273+
274+
// Parent: shouldn't notice signals raised in child
275+
AssertStopNotRequested();
276+
277+
// Stop source still usable in parent
278+
TriggerSignal();
279+
AssertStopNotRequested();
280+
281+
RegisterHandler();
282+
TriggerSignal();
283+
AssertStopRequested();
284+
}
285+
286+
TEST_F(SignalCancelTest, ForkSafetyRegisteredHandlers) {
287+
RegisterHandler();
288+
289+
RunInChild([&]() {
290+
// Child: signal handlers are unregistered and need to be re-registered
291+
TriggerSignal();
292+
AssertStopNotRequested();
293+
294+
// Can re-register and receive signals
295+
RegisterHandler();
296+
TriggerSignal();
297+
AssertStopRequested();
298+
});
299+
300+
// Parent: shouldn't notice signals raised in child
301+
AssertStopNotRequested();
302+
303+
// Stop source still usable in parent
304+
TriggerSignal();
305+
AssertStopRequested();
306+
}
307+
#endif
308+
241309
TEST_F(CancelTest, ThreadedPollSuccess) {
242310
constexpr int kNumThreads = 10;
243311

0 commit comments

Comments
 (0)