Skip to content

Commit e1cdf53

Browse files
Al2Klimovyhabteab
authored andcommitted
Merge pull request #9321 from Icinga/perfdata-resume-signal
Perfdata writers: disconnect handlers from signals in Pause()
2 parents a04bb06 + 56933b8 commit e1cdf53

12 files changed

+40
-16
lines changed

lib/perfdata/elasticsearchwriter.cpp

+11-6
Original file line numberDiff line numberDiff line change
@@ -95,23 +95,28 @@ void ElasticsearchWriter::Resume()
9595
m_FlushTimer->Reschedule(0);
9696

9797
/* Register for new metrics. */
98-
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
98+
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
99+
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
99100
CheckResultHandler(checkable, cr);
100101
});
101-
Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type,
102-
const MessageOrigin::Ptr&) {
102+
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
103+
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
103104
StateChangeHandler(checkable, cr, type);
104105
});
105-
Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable,
106-
const std::set<User::Ptr>& users, const NotificationType& type, const CheckResult::Ptr& cr, const String& author,
107-
const String& text, const MessageOrigin::Ptr&) {
106+
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification,
107+
const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type,
108+
const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) {
108109
NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
109110
});
110111
}
111112

112113
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
113114
void ElasticsearchWriter::Pause()
114115
{
116+
m_HandleCheckResults.disconnect();
117+
m_HandleStateChanges.disconnect();
118+
m_HandleNotifications.disconnect();
119+
115120
Flush();
116121
m_WorkQueue.Join();
117122
Flush();

lib/perfdata/elasticsearchwriter.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class ElasticsearchWriter final : public ObjectImpl<ElasticsearchWriter>
3131
private:
3232
String m_EventPrefix;
3333
WorkQueue m_WorkQueue{10000000, 1};
34+
boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
3435
Timer::Ptr m_FlushTimer;
3536
std::vector<String> m_DataBuffer;
3637
std::mutex m_DataBufferMutex;

lib/perfdata/gelfwriter.cpp

+11-6
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,28 @@ void GelfWriter::Resume()
9090
m_ReconnectTimer->Reschedule(0);
9191

9292
/* Register event handlers. */
93-
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
93+
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
94+
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
9495
CheckResultHandler(checkable, cr);
9596
});
96-
Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable,
97-
const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr, const String& author,
98-
const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
97+
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
98+
const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
99+
const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
99100
NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
100101
});
101-
Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type,
102-
const MessageOrigin::Ptr&) {
102+
m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
103+
const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
103104
StateChangeHandler(checkable, cr, type);
104105
});
105106
}
106107

107108
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
108109
void GelfWriter::Pause()
109110
{
111+
m_HandleCheckResults.disconnect();
112+
m_HandleNotifications.disconnect();
113+
m_HandleStateChanges.disconnect();
114+
110115
m_ReconnectTimer.reset();
111116

112117
try {

lib/perfdata/gelfwriter.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class GelfWriter final : public ObjectImpl<GelfWriter>
3636
OptionalTlsStream m_Stream;
3737
WorkQueue m_WorkQueue{10000000, 1};
3838

39+
boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
3940
Timer::Ptr m_ReconnectTimer;
4041

4142
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);

lib/perfdata/graphitewriter.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ void GraphiteWriter::Resume()
9595
m_ReconnectTimer->Reschedule(0);
9696

9797
/* Register event handlers. */
98-
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
98+
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
99+
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
99100
CheckResultHandler(checkable, cr);
100101
});
101102
}
@@ -105,6 +106,7 @@ void GraphiteWriter::Resume()
105106
*/
106107
void GraphiteWriter::Pause()
107108
{
109+
m_HandleCheckResults.disconnect();
108110
m_ReconnectTimer.reset();
109111

110112
try {

lib/perfdata/graphitewriter.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class GraphiteWriter final : public ObjectImpl<GraphiteWriter>
4141
std::mutex m_StreamMutex;
4242
WorkQueue m_WorkQueue{10000000, 1};
4343

44+
boost::signals2::connection m_HandleCheckResults;
4445
Timer::Ptr m_ReconnectTimer;
4546

4647
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);

lib/perfdata/influxdbcommonwriter.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,17 @@ void InfluxdbCommonWriter::Resume()
9797
m_FlushTimer->Reschedule(0);
9898

9999
/* Register for new metrics. */
100-
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
100+
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
101+
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
101102
CheckResultHandler(checkable, cr);
102103
});
103104
}
104105

105106
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
106107
void InfluxdbCommonWriter::Pause()
107108
{
109+
m_HandleCheckResults.disconnect();
110+
108111
/* Force a flush. */
109112
Log(LogDebug, GetReflectionType()->GetName())
110113
<< "Processing pending tasks and flushing data buffers.";

lib/perfdata/influxdbcommonwriter.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class InfluxdbCommonWriter : public ObjectImpl<InfluxdbCommonWriter>
4747
virtual Url::Ptr AssembleUrl() = 0;
4848

4949
private:
50+
boost::signals2::connection m_HandleCheckResults;
5051
Timer::Ptr m_FlushTimer;
5152
WorkQueue m_WorkQueue{10000000, 1};
5253
std::vector<String> m_DataBuffer;

lib/perfdata/opentsdbwriter.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ void OpenTsdbWriter::Resume()
8181
m_ReconnectTimer->Start();
8282
m_ReconnectTimer->Reschedule(0);
8383

84-
Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
84+
m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
8585
CheckResultHandler(checkable, cr);
8686
});
8787
}
@@ -91,6 +91,7 @@ void OpenTsdbWriter::Resume()
9191
*/
9292
void OpenTsdbWriter::Pause()
9393
{
94+
m_HandleCheckResults.disconnect();
9495
m_ReconnectTimer.reset();
9596

9697
Log(LogInformation, "OpentsdbWriter")

lib/perfdata/opentsdbwriter.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class OpenTsdbWriter final : public ObjectImpl<OpenTsdbWriter>
3737
private:
3838
Shared<AsioTcpStream>::Ptr m_Stream;
3939

40+
boost::signals2::connection m_HandleCheckResults;
4041
Timer::Ptr m_ReconnectTimer;
4142

4243
Dictionary::Ptr m_ServiceConfigTemplate;

lib/perfdata/perfdatawriter.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ void PerfdataWriter::Resume()
5353
Log(LogInformation, "PerfdataWriter")
5454
<< "'" << GetName() << "' resumed.";
5555

56-
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
56+
m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
57+
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
5758
CheckResultHandler(checkable, cr);
5859
});
5960

@@ -68,6 +69,7 @@ void PerfdataWriter::Resume()
6869

6970
void PerfdataWriter::Pause()
7071
{
72+
m_HandleCheckResults.disconnect();
7173
m_RotationTimer.reset();
7274

7375
#ifdef I2_DEBUG

lib/perfdata/perfdatawriter.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class PerfdataWriter final : public ObjectImpl<PerfdataWriter>
3434
void Pause() override;
3535

3636
private:
37+
boost::signals2::connection m_HandleCheckResults;
3738
Timer::Ptr m_RotationTimer;
3839
std::ofstream m_ServiceOutputFile;
3940
std::ofstream m_HostOutputFile;

0 commit comments

Comments
 (0)