Skip to content

Commit 6ab28bc

Browse files
[ML] Refactor model snapshot writing to use concurrent writer (#12)
This refactors the writing of the model snapshot document and model size stats document out of CJsonOutputWriter. It achieves the following improvements: - model snapshot documents do not have to be queued up. They can be written directly leveraging the safe concurrent writer that was implemented for forecasts. This minimizes the delay of a model snapshot being written which in failure cases may result into having a more recent snapshot available. - better separation of concerns and a smaller CJsonOutputWriter class. - simplifies adding new fields in the model snapshot document as needed for writing the minimum compatible version.
1 parent bb6a805 commit 6ab28bc

16 files changed

+413
-456
lines changed

bin/autodetect/Main.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include <api/CIoManager.h>
3737
#include <api/CJsonOutputWriter.h>
3838
#include <api/CLengthEncodedInputParser.h>
39+
#include <api/CModelSnapshotJsonWriter.h>
3940
#include <api/COutputChainer.h>
4041
#include <api/CSingleStreamDataAdder.h>
4142
#include <api/CSingleStreamSearcher.h>
@@ -262,8 +263,7 @@ int main(int argc, char **argv)
262263

263264
ml::core::CJsonOutputStreamWrapper wrappedOutputStream(ioMgr.outputStream());
264265

265-
// output writer for CFieldDataTyper and persistence callback
266-
ml::api::CJsonOutputWriter outputWriter(jobId, wrappedOutputStream);
266+
ml::api::CModelSnapshotJsonWriter modelSnapshotWriter(jobId, wrappedOutputStream);
267267
if (fieldConfig.initFromCmdLine(fieldConfigFile,
268268
clauseTokens) == false)
269269
{
@@ -277,8 +277,7 @@ int main(int argc, char **argv)
277277
fieldConfig,
278278
modelConfig,
279279
wrappedOutputStream,
280-
boost::bind(&ml::api::CJsonOutputWriter::reportPersistComplete,
281-
&outputWriter, _1, _2, _3, _4, _5, _6, _7, _8),
280+
boost::bind(&ml::api::CModelSnapshotJsonWriter::write, &modelSnapshotWriter, _1),
282281
periodicPersister.get(),
283282
maxQuantileInterval,
284283
timeField,
@@ -303,8 +302,10 @@ int main(int argc, char **argv)
303302
// Chain the categorizer's output to the anomaly detector's input
304303
ml::api::COutputChainer outputChainer(job);
305304

305+
ml::api::CJsonOutputWriter fieldDataTyperOutputWriter(jobId, wrappedOutputStream);
306+
306307
// The typer knows how to assign categories to records
307-
ml::api::CFieldDataTyper typer(jobId, fieldConfig, limits, outputChainer, outputWriter);
308+
ml::api::CFieldDataTyper typer(jobId, fieldConfig, limits, outputChainer, fieldDataTyperOutputWriter);
308309

309310
if (fieldConfig.fieldNameSuperset().count(ml::api::CFieldDataTyper::MLCATEGORY_NAME) > 0)
310311
{
@@ -330,7 +331,7 @@ int main(int argc, char **argv)
330331
// as it must be finalised before the skeleton is destroyed, and C++
331332
// destruction order means the skeleton will be destroyed before the output
332333
// writer as it was constructed last.
333-
outputWriter.finalise();
334+
fieldDataTyperOutputWriter.finalise();
334335

335336
if (!ioLoopSucceeded)
336337
{

include/api/CAnomalyJob.h

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <api/CDataProcessor.h>
2424
#include <api/CForecastRunner.h>
2525
#include <api/CJsonOutputWriter.h>
26+
#include <api/CModelSnapshotJsonWriter.h>
2627
#include <api/ImportExport.h>
2728

2829
#include <boost/shared_ptr.hpp>
@@ -111,15 +112,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor
111112

112113

113114
public:
114-
typedef std::function<void(
115-
core_t::TTime,
116-
const std::string &,
117-
const std::string &,
118-
size_t,
119-
const model::CResourceMonitor::SResults &,
120-
const std::string &,
121-
core_t::TTime,
122-
core_t::TTime)> TPersistCompleteFunc;
115+
typedef std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport &)> TPersistCompleteFunc;
123116
typedef model::CAnomalyDetector::TAnomalyDetectorPtr TAnomalyDetectorPtr;
124117
typedef std::vector<TAnomalyDetectorPtr> TAnomalyDetectorPtrVec;
125118
typedef std::vector<TAnomalyDetectorPtr>::iterator TAnomalyDetectorPtrVecItr;

include/api/CJsonOutputWriter.h

Lines changed: 7 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#define INCLUDED_ml_api_CJsonOutputWriter_h
88

99
#include <core/CJsonOutputStreamWrapper.h>
10-
#include <core/CMutex.h>
1110
#include <core/CoreTypes.h>
1211
#include <core/CRapidJsonConcurrentLineWriter.h>
1312
#include <core/CSmallVector.h>
@@ -27,7 +26,6 @@
2726

2827
#include <iosfwd>
2928
#include <map>
30-
#include <queue>
3129
#include <sstream>
3230
#include <string>
3331
#include <utility>
@@ -181,98 +179,10 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler
181179
typedef TTimeBucketDataMap::iterator TTimeBucketDataMapItr;
182180
typedef TTimeBucketDataMap::const_iterator TTimeBucketDataMapCItr;
183181

184-
185-
static const std::string JOB_ID;
186-
static const std::string TIMESTAMP;
187-
static const std::string BUCKET;
188-
static const std::string LOG_TIME;
189-
static const std::string DETECTOR_INDEX;
190-
static const std::string RECORDS;
191-
static const std::string EVENT_COUNT;
192-
static const std::string IS_INTERIM;
193-
static const std::string PROBABILITY;
194-
static const std::string RAW_ANOMALY_SCORE;
195-
static const std::string ANOMALY_SCORE;
196-
static const std::string RECORD_SCORE;
197-
static const std::string INITIAL_RECORD_SCORE;
198-
static const std::string INFLUENCER_SCORE;
199-
static const std::string INITIAL_INFLUENCER_SCORE;
200-
static const std::string FIELD_NAME;
201-
static const std::string BY_FIELD_NAME;
202-
static const std::string BY_FIELD_VALUE;
203-
static const std::string CORRELATED_BY_FIELD_VALUE;
204-
static const std::string TYPICAL;
205-
static const std::string ACTUAL;
206-
static const std::string CAUSES;
207-
static const std::string FUNCTION;
208-
static const std::string FUNCTION_DESCRIPTION;
209-
static const std::string OVER_FIELD_NAME;
210-
static const std::string OVER_FIELD_VALUE;
211-
static const std::string PARTITION_FIELD_NAME;
212-
static const std::string PARTITION_FIELD_VALUE;
213-
static const std::string INITIAL_SCORE;
214-
static const std::string BUCKET_INFLUENCERS;
215-
static const std::string INFLUENCERS;
216-
static const std::string INFLUENCER_FIELD_NAME;
217-
static const std::string INFLUENCER_FIELD_VALUE;
218-
static const std::string INFLUENCER_FIELD_VALUES;
219-
static const std::string FLUSH;
220-
static const std::string ID;
221-
static const std::string LAST_FINALIZED_BUCKET_END;
222-
static const std::string QUANTILE_STATE;
223-
static const std::string QUANTILES;
224-
static const std::string MODEL_SIZE_STATS;
225-
static const std::string MODEL_BYTES;
226-
static const std::string TOTAL_BY_FIELD_COUNT;
227-
static const std::string TOTAL_OVER_FIELD_COUNT;
228-
static const std::string TOTAL_PARTITION_FIELD_COUNT;
229-
static const std::string BUCKET_ALLOCATION_FAILURES_COUNT;
230-
static const std::string MEMORY_STATUS;
231-
static const std::string CATEGORY_DEFINITION;
232-
static const std::string CATEGORY_ID;
233-
static const std::string TERMS;
234-
static const std::string REGEX;
235-
static const std::string MAX_MATCHING_LENGTH;
236-
static const std::string EXAMPLES;
237-
static const std::string MODEL_SNAPSHOT;
238-
static const std::string SNAPSHOT_ID;
239-
static const std::string SNAPSHOT_DOC_COUNT;
240-
static const std::string DESCRIPTION;
241-
static const std::string LATEST_RECORD_TIME;
242-
static const std::string BUCKET_SPAN;
243-
static const std::string LATEST_RESULT_TIME;
244-
static const std::string PROCESSING_TIME;
245-
static const std::string TIME_INFLUENCER;
246-
static const std::string PARTITION_SCORES;
247-
static const std::string SCHEDULED_EVENTS;
248-
249182
private:
250183
typedef CCategoryExamplesCollector::TStrSet TStrSet;
251184
typedef TStrSet::const_iterator TStrSetCItr;
252185

253-
struct SModelSnapshotReport
254-
{
255-
SModelSnapshotReport(core_t::TTime snapshotTimestamp,
256-
const std::string &description,
257-
const std::string &snapshotId,
258-
size_t numDocs,
259-
const model::CResourceMonitor::SResults &modelSizeStats,
260-
const std::string &normalizerState,
261-
core_t::TTime latestRecordTime,
262-
core_t::TTime latestFinalResultTime);
263-
264-
core_t::TTime s_SnapshotTimestamp;
265-
std::string s_Description;
266-
std::string s_SnapshotId;
267-
size_t s_NumDocs;
268-
model::CResourceMonitor::SResults s_ModelSizeStats;
269-
std::string s_NormalizerState;
270-
core_t::TTime s_LatestRecordTime;
271-
core_t::TTime s_LatestFinalResultTime;
272-
};
273-
274-
typedef std::queue<SModelSnapshotReport> TModelSnapshotReportQueue;
275-
276186
public:
277187
//! Constructor that causes output to be written to the specified wrapped stream
278188
CJsonOutputWriter(const std::string &jobId,
@@ -343,19 +253,6 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler
343253
//! from the CResourceMonitor via a callback
344254
void reportMemoryUsage(const model::CResourceMonitor::SResults &results);
345255

346-
//! Report information about completion of model persistence.
347-
//! This method can be called in a thread other than the one
348-
//! receiving the majority of results, so reporting is done
349-
//! asynchronously.
350-
void reportPersistComplete(core_t::TTime snapshotTimestamp,
351-
const std::string &description,
352-
const std::string &snapshotId,
353-
size_t numDocs,
354-
const model::CResourceMonitor::SResults &modelSizeStats,
355-
const std::string &normalizerState,
356-
core_t::TTime latestRecordTime,
357-
core_t::TTime latestFinalResultTime);
358-
359256
//! Acknowledge a flush request by echoing back the flush ID
360257
void acknowledgeFlush(const std::string &flushId, core_t::TTime lastFinalizedBucketEnd);
361258

@@ -418,49 +315,33 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler
418315
void addPartitionScores(const CHierarchicalResultsWriter::TResults &results,
419316
TDocumentWeakPtr weakDoc);
420317

421-
//! Write any model snapshot reports that are queuing up.
422-
void writeModelSnapshotReports(void);
423-
424-
//! Write the JSON object showing current levels of resource usage, as
425-
//! given to us from the CResourceMonitor via a callback
426-
void writeMemoryUsageObject(const model::CResourceMonitor::SResults &results);
427-
428-
//! Write the quantile's state
429-
void writeQuantileState(const std::string &state, core_t::TTime timestamp);
430-
431318
private:
432319
//! The job ID
433-
std::string m_JobId;
320+
std::string m_JobId;
434321

435322
//! JSON line writer
436-
core::CRapidJsonConcurrentLineWriter m_Writer;
323+
core::CRapidJsonConcurrentLineWriter m_Writer;
437324

438325
//! Time of last non-interim bucket written to output
439-
core_t::TTime m_LastNonInterimBucketTime;
326+
core_t::TTime m_LastNonInterimBucketTime;
440327

441328
//! Has the output been finalised?
442-
bool m_Finalised;
329+
bool m_Finalised;
443330

444331
//! Max number of records to write for each bucket/detector
445-
size_t m_RecordOutputLimit;
332+
size_t m_RecordOutputLimit;
446333

447334
//! Vector for building up documents representing nested sub-results.
448335
//! The documents in this vector will reference memory owned by
449336
//! m_JsonPoolAllocator. (Hence this is declared after the memory pool
450337
//! so that it's destroyed first when the destructor runs.)
451-
TDocumentWeakPtrVec m_NestedDocs;
338+
TDocumentWeakPtrVec m_NestedDocs;
452339

453340
//! Bucket data waiting to be written. The map is keyed on bucket time.
454341
//! The documents in this map will reference memory owned by
455342
//! m_JsonPoolAllocator. (Hence this is declared after the memory pool
456343
//! so that it's destroyed first when the destructor runs.)
457-
TTimeBucketDataMap m_BucketDataByTime;
458-
459-
//! Protects the m_ModelSnapshotReports from concurrent access.
460-
core::CMutex m_ModelSnapshotReportsQueueMutex;
461-
462-
//! Queue of model snapshot reports waiting to be output.
463-
TModelSnapshotReportQueue m_ModelSnapshotReports;
344+
TTimeBucketDataMap m_BucketDataByTime;
464345
};
465346

466347

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
#ifndef INCLUDED_ml_api_CModelSizeStatsJsonWriter_h
7+
#define INCLUDED_ml_api_CModelSizeStatsJsonWriter_h
8+
9+
#include <core/CNonInstantiatable.h>
10+
#include <core/CRapidJsonConcurrentLineWriter.h>
11+
12+
#include <model/CResourceMonitor.h>
13+
14+
#include <api/ImportExport.h>
15+
16+
#include <string>
17+
18+
namespace ml
19+
{
20+
namespace api
21+
{
22+
23+
//! \brief
24+
//! A static utility for writing the model_size_stats document in JSON.
25+
class API_EXPORT CModelSizeStatsJsonWriter : private core::CNonInstantiatable
26+
{
27+
public:
28+
//! Writes the model size stats in the \p results in JSON format.
29+
static void write(const std::string &jobId,
30+
const model::CResourceMonitor::SResults &results,
31+
core::CRapidJsonConcurrentLineWriter &writer);
32+
};
33+
34+
}
35+
}
36+
37+
#endif // INCLUDED_ml_api_CModelSizeStatsJsonWriter_h
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
#ifndef INCLUDED_ml_api_CModelSnapshotJsonWriter_h
7+
#define INCLUDED_ml_api_CModelSnapshotJsonWriter_h
8+
9+
#include <core/CJsonOutputStreamWrapper.h>
10+
#include <core/CRapidJsonConcurrentLineWriter.h>
11+
12+
#include <model/CResourceMonitor.h>
13+
14+
#include <api/ImportExport.h>
15+
16+
#include <string>
17+
18+
namespace ml
19+
{
20+
namespace api
21+
{
22+
23+
//! \brief
24+
//! Write model snapshots in JSON format
25+
//!
26+
//! DESCRIPTION:\n
27+
//! Outputs the model snapshot documents that accompany each state persist.
28+
//!
29+
class API_EXPORT CModelSnapshotJsonWriter
30+
{
31+
public:
32+
//! Structure to store the model snapshot metadata
33+
struct SModelSnapshotReport
34+
{
35+
core_t::TTime s_SnapshotTimestamp;
36+
std::string s_Description;
37+
std::string s_SnapshotId;
38+
size_t s_NumDocs;
39+
model::CResourceMonitor::SResults s_ModelSizeStats;
40+
std::string s_NormalizerState;
41+
core_t::TTime s_LatestRecordTime;
42+
core_t::TTime s_LatestFinalResultTime;
43+
};
44+
45+
public:
46+
//! Constructor that causes output to be written to the specified wrapped stream
47+
CModelSnapshotJsonWriter(const std::string &jobId,
48+
core::CJsonOutputStreamWrapper &strmOut);
49+
50+
//! Writes the given model snapshot in JSON format.
51+
void write(const SModelSnapshotReport &report);
52+
53+
//! Write the quantile's state
54+
static void writeQuantileState(const std::string &jobId,
55+
const std::string &state,
56+
core_t::TTime timestamp,
57+
core::CRapidJsonConcurrentLineWriter &writer);
58+
59+
private:
60+
//! The job ID
61+
std::string m_JobId;
62+
63+
//! JSON line writer
64+
core::CRapidJsonConcurrentLineWriter m_Writer;
65+
};
66+
67+
68+
}
69+
}
70+
71+
#endif // INCLUDED_ml_api_CModelSnapshotJsonWriter_h

0 commit comments

Comments
 (0)