Skip to content

Commit af650fd

Browse files
[ML] Refactor model snapshot writing to use concurrent writer (#12)
This refactors the writing of the model snapshot document and model size stats document out of CJsonOutputWriter. It achieves the following improvements: - model snapshot documents do not have to be queued up. They can be written directly leveraging the safe concurrent writer that was implemented for forecasts. This minimizes the delay of a model snapshot being written which in failure cases may result into having a more recent snapshot available. - better separation of concerns and a smaller CJsonOutputWriter class. - simplifies adding new fields in the model snapshot document as needed for writing the minimum compatible version.
1 parent ad634ff commit af650fd

16 files changed

+449
-456
lines changed

bin/autodetect/Main.cc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include <api/CIoManager.h>
4646
#include <api/CJsonOutputWriter.h>
4747
#include <api/CLengthEncodedInputParser.h>
48+
#include <api/CModelSnapshotJsonWriter.h>
4849
#include <api/COutputChainer.h>
4950
#include <api/CSingleStreamDataAdder.h>
5051
#include <api/CSingleStreamSearcher.h>
@@ -271,8 +272,7 @@ int main(int argc, char **argv)
271272

272273
ml::core::CJsonOutputStreamWrapper wrappedOutputStream(ioMgr.outputStream());
273274

274-
// output writer for CFieldDataTyper and persistence callback
275-
ml::api::CJsonOutputWriter outputWriter(jobId, wrappedOutputStream);
275+
ml::api::CModelSnapshotJsonWriter modelSnapshotWriter(jobId, wrappedOutputStream);
276276
if (fieldConfig.initFromCmdLine(fieldConfigFile,
277277
clauseTokens) == false)
278278
{
@@ -286,8 +286,7 @@ int main(int argc, char **argv)
286286
fieldConfig,
287287
modelConfig,
288288
wrappedOutputStream,
289-
boost::bind(&ml::api::CJsonOutputWriter::reportPersistComplete,
290-
&outputWriter, _1, _2, _3, _4, _5, _6, _7, _8),
289+
boost::bind(&ml::api::CModelSnapshotJsonWriter::write, &modelSnapshotWriter, _1),
291290
periodicPersister.get(),
292291
maxQuantileInterval,
293292
timeField,
@@ -312,8 +311,10 @@ int main(int argc, char **argv)
312311
// Chain the categorizer's output to the anomaly detector's input
313312
ml::api::COutputChainer outputChainer(job);
314313

314+
ml::api::CJsonOutputWriter fieldDataTyperOutputWriter(jobId, wrappedOutputStream);
315+
315316
// The typer knows how to assign categories to records
316-
ml::api::CFieldDataTyper typer(jobId, fieldConfig, limits, outputChainer, outputWriter);
317+
ml::api::CFieldDataTyper typer(jobId, fieldConfig, limits, outputChainer, fieldDataTyperOutputWriter);
317318

318319
if (fieldConfig.fieldNameSuperset().count(ml::api::CFieldDataTyper::MLCATEGORY_NAME) > 0)
319320
{
@@ -339,7 +340,7 @@ int main(int argc, char **argv)
339340
// as it must be finalised before the skeleton is destroyed, and C++
340341
// destruction order means the skeleton will be destroyed before the output
341342
// writer as it was constructed last.
342-
outputWriter.finalise();
343+
fieldDataTyperOutputWriter.finalise();
343344

344345
if (!ioLoopSucceeded)
345346
{

include/api/CAnomalyJob.h

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include <api/CDataProcessor.h>
3333
#include <api/CForecastRunner.h>
3434
#include <api/CJsonOutputWriter.h>
35+
#include <api/CModelSnapshotJsonWriter.h>
3536
#include <api/ImportExport.h>
3637

3738
#include <boost/shared_ptr.hpp>
@@ -120,15 +121,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor
120121

121122

122123
public:
123-
typedef std::function<void(
124-
core_t::TTime,
125-
const std::string &,
126-
const std::string &,
127-
size_t,
128-
const model::CResourceMonitor::SResults &,
129-
const std::string &,
130-
core_t::TTime,
131-
core_t::TTime)> TPersistCompleteFunc;
124+
typedef std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport &)> TPersistCompleteFunc;
132125
typedef model::CAnomalyDetector::TAnomalyDetectorPtr TAnomalyDetectorPtr;
133126
typedef std::vector<TAnomalyDetectorPtr> TAnomalyDetectorPtrVec;
134127
typedef std::vector<TAnomalyDetectorPtr>::iterator TAnomalyDetectorPtrVecItr;

include/api/CJsonOutputWriter.h

Lines changed: 7 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
#define INCLUDED_ml_api_CJsonOutputWriter_h
1717

1818
#include <core/CJsonOutputStreamWrapper.h>
19-
#include <core/CMutex.h>
2019
#include <core/CoreTypes.h>
2120
#include <core/CRapidJsonConcurrentLineWriter.h>
2221
#include <core/CSmallVector.h>
@@ -36,7 +35,6 @@
3635

3736
#include <iosfwd>
3837
#include <map>
39-
#include <queue>
4038
#include <sstream>
4139
#include <string>
4240
#include <utility>
@@ -190,98 +188,10 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler
190188
typedef TTimeBucketDataMap::iterator TTimeBucketDataMapItr;
191189
typedef TTimeBucketDataMap::const_iterator TTimeBucketDataMapCItr;
192190

193-
194-
static const std::string JOB_ID;
195-
static const std::string TIMESTAMP;
196-
static const std::string BUCKET;
197-
static const std::string LOG_TIME;
198-
static const std::string DETECTOR_INDEX;
199-
static const std::string RECORDS;
200-
static const std::string EVENT_COUNT;
201-
static const std::string IS_INTERIM;
202-
static const std::string PROBABILITY;
203-
static const std::string RAW_ANOMALY_SCORE;
204-
static const std::string ANOMALY_SCORE;
205-
static const std::string RECORD_SCORE;
206-
static const std::string INITIAL_RECORD_SCORE;
207-
static const std::string INFLUENCER_SCORE;
208-
static const std::string INITIAL_INFLUENCER_SCORE;
209-
static const std::string FIELD_NAME;
210-
static const std::string BY_FIELD_NAME;
211-
static const std::string BY_FIELD_VALUE;
212-
static const std::string CORRELATED_BY_FIELD_VALUE;
213-
static const std::string TYPICAL;
214-
static const std::string ACTUAL;
215-
static const std::string CAUSES;
216-
static const std::string FUNCTION;
217-
static const std::string FUNCTION_DESCRIPTION;
218-
static const std::string OVER_FIELD_NAME;
219-
static const std::string OVER_FIELD_VALUE;
220-
static const std::string PARTITION_FIELD_NAME;
221-
static const std::string PARTITION_FIELD_VALUE;
222-
static const std::string INITIAL_SCORE;
223-
static const std::string BUCKET_INFLUENCERS;
224-
static const std::string INFLUENCERS;
225-
static const std::string INFLUENCER_FIELD_NAME;
226-
static const std::string INFLUENCER_FIELD_VALUE;
227-
static const std::string INFLUENCER_FIELD_VALUES;
228-
static const std::string FLUSH;
229-
static const std::string ID;
230-
static const std::string LAST_FINALIZED_BUCKET_END;
231-
static const std::string QUANTILE_STATE;
232-
static const std::string QUANTILES;
233-
static const std::string MODEL_SIZE_STATS;
234-
static const std::string MODEL_BYTES;
235-
static const std::string TOTAL_BY_FIELD_COUNT;
236-
static const std::string TOTAL_OVER_FIELD_COUNT;
237-
static const std::string TOTAL_PARTITION_FIELD_COUNT;
238-
static const std::string BUCKET_ALLOCATION_FAILURES_COUNT;
239-
static const std::string MEMORY_STATUS;
240-
static const std::string CATEGORY_DEFINITION;
241-
static const std::string CATEGORY_ID;
242-
static const std::string TERMS;
243-
static const std::string REGEX;
244-
static const std::string MAX_MATCHING_LENGTH;
245-
static const std::string EXAMPLES;
246-
static const std::string MODEL_SNAPSHOT;
247-
static const std::string SNAPSHOT_ID;
248-
static const std::string SNAPSHOT_DOC_COUNT;
249-
static const std::string DESCRIPTION;
250-
static const std::string LATEST_RECORD_TIME;
251-
static const std::string BUCKET_SPAN;
252-
static const std::string LATEST_RESULT_TIME;
253-
static const std::string PROCESSING_TIME;
254-
static const std::string TIME_INFLUENCER;
255-
static const std::string PARTITION_SCORES;
256-
static const std::string SCHEDULED_EVENTS;
257-
258191
private:
259192
typedef CCategoryExamplesCollector::TStrSet TStrSet;
260193
typedef TStrSet::const_iterator TStrSetCItr;
261194

262-
struct SModelSnapshotReport
263-
{
264-
SModelSnapshotReport(core_t::TTime snapshotTimestamp,
265-
const std::string &description,
266-
const std::string &snapshotId,
267-
size_t numDocs,
268-
const model::CResourceMonitor::SResults &modelSizeStats,
269-
const std::string &normalizerState,
270-
core_t::TTime latestRecordTime,
271-
core_t::TTime latestFinalResultTime);
272-
273-
core_t::TTime s_SnapshotTimestamp;
274-
std::string s_Description;
275-
std::string s_SnapshotId;
276-
size_t s_NumDocs;
277-
model::CResourceMonitor::SResults s_ModelSizeStats;
278-
std::string s_NormalizerState;
279-
core_t::TTime s_LatestRecordTime;
280-
core_t::TTime s_LatestFinalResultTime;
281-
};
282-
283-
typedef std::queue<SModelSnapshotReport> TModelSnapshotReportQueue;
284-
285195
public:
286196
//! Constructor that causes output to be written to the specified wrapped stream
287197
CJsonOutputWriter(const std::string &jobId,
@@ -352,19 +262,6 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler
352262
//! from the CResourceMonitor via a callback
353263
void reportMemoryUsage(const model::CResourceMonitor::SResults &results);
354264

355-
//! Report information about completion of model persistence.
356-
//! This method can be called in a thread other than the one
357-
//! receiving the majority of results, so reporting is done
358-
//! asynchronously.
359-
void reportPersistComplete(core_t::TTime snapshotTimestamp,
360-
const std::string &description,
361-
const std::string &snapshotId,
362-
size_t numDocs,
363-
const model::CResourceMonitor::SResults &modelSizeStats,
364-
const std::string &normalizerState,
365-
core_t::TTime latestRecordTime,
366-
core_t::TTime latestFinalResultTime);
367-
368265
//! Acknowledge a flush request by echoing back the flush ID
369266
void acknowledgeFlush(const std::string &flushId, core_t::TTime lastFinalizedBucketEnd);
370267

@@ -427,49 +324,33 @@ class API_EXPORT CJsonOutputWriter : public COutputHandler
427324
void addPartitionScores(const CHierarchicalResultsWriter::TResults &results,
428325
TDocumentWeakPtr weakDoc);
429326

430-
//! Write any model snapshot reports that are queuing up.
431-
void writeModelSnapshotReports(void);
432-
433-
//! Write the JSON object showing current levels of resource usage, as
434-
//! given to us from the CResourceMonitor via a callback
435-
void writeMemoryUsageObject(const model::CResourceMonitor::SResults &results);
436-
437-
//! Write the quantile's state
438-
void writeQuantileState(const std::string &state, core_t::TTime timestamp);
439-
440327
private:
441328
//! The job ID
442-
std::string m_JobId;
329+
std::string m_JobId;
443330

444331
//! JSON line writer
445-
core::CRapidJsonConcurrentLineWriter m_Writer;
332+
core::CRapidJsonConcurrentLineWriter m_Writer;
446333

447334
//! Time of last non-interim bucket written to output
448-
core_t::TTime m_LastNonInterimBucketTime;
335+
core_t::TTime m_LastNonInterimBucketTime;
449336

450337
//! Has the output been finalised?
451-
bool m_Finalised;
338+
bool m_Finalised;
452339

453340
//! Max number of records to write for each bucket/detector
454-
size_t m_RecordOutputLimit;
341+
size_t m_RecordOutputLimit;
455342

456343
//! Vector for building up documents representing nested sub-results.
457344
//! The documents in this vector will reference memory owned by
458345
//! m_JsonPoolAllocator. (Hence this is declared after the memory pool
459346
//! so that it's destroyed first when the destructor runs.)
460-
TDocumentWeakPtrVec m_NestedDocs;
347+
TDocumentWeakPtrVec m_NestedDocs;
461348

462349
//! Bucket data waiting to be written. The map is keyed on bucket time.
463350
//! The documents in this map will reference memory owned by
464351
//! m_JsonPoolAllocator. (Hence this is declared after the memory pool
465352
//! so that it's destroyed first when the destructor runs.)
466-
TTimeBucketDataMap m_BucketDataByTime;
467-
468-
//! Protects the m_ModelSnapshotReports from concurrent access.
469-
core::CMutex m_ModelSnapshotReportsQueueMutex;
470-
471-
//! Queue of model snapshot reports waiting to be output.
472-
TModelSnapshotReportQueue m_ModelSnapshotReports;
353+
TTimeBucketDataMap m_BucketDataByTime;
473354
};
474355

475356

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* ELASTICSEARCH CONFIDENTIAL
3+
*
4+
* Copyright (c) 2018 Elasticsearch BV. All Rights Reserved.
5+
*
6+
* Notice: this software, and all information contained
7+
* therein, is the exclusive property of Elasticsearch BV
8+
* and its licensors, if any, and is protected under applicable
9+
* domestic and foreign law, and international treaties.
10+
*
11+
* Reproduction, republication or distribution without the
12+
* express written consent of Elasticsearch BV is
13+
* strictly prohibited.
14+
*/
15+
#ifndef INCLUDED_ml_api_CModelSizeStatsJsonWriter_h
16+
#define INCLUDED_ml_api_CModelSizeStatsJsonWriter_h
17+
18+
#include <core/CNonInstantiatable.h>
19+
#include <core/CRapidJsonConcurrentLineWriter.h>
20+
21+
#include <model/CResourceMonitor.h>
22+
23+
#include <api/ImportExport.h>
24+
25+
#include <string>
26+
27+
namespace ml
28+
{
29+
namespace api
30+
{
31+
32+
//! \brief
33+
//! A static utility for writing the model_size_stats document in JSON.
34+
class API_EXPORT CModelSizeStatsJsonWriter : private core::CNonInstantiatable
35+
{
36+
public:
37+
//! Writes the model size stats in the \p results in JSON format.
38+
static void write(const std::string &jobId,
39+
const model::CResourceMonitor::SResults &results,
40+
core::CRapidJsonConcurrentLineWriter &writer);
41+
};
42+
43+
}
44+
}
45+
46+
#endif // INCLUDED_ml_api_CModelSizeStatsJsonWriter_h
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* ELASTICSEARCH CONFIDENTIAL
3+
*
4+
* Copyright (c) 2018 Elasticsearch BV. All Rights Reserved.
5+
*
6+
* Notice: this software, and all information contained
7+
* therein, is the exclusive property of Elasticsearch BV
8+
* and its licensors, if any, and is protected under applicable
9+
* domestic and foreign law, and international treaties.
10+
*
11+
* Reproduction, republication or distribution without the
12+
* express written consent of Elasticsearch BV is
13+
* strictly prohibited.
14+
*/
15+
#ifndef INCLUDED_ml_api_CModelSnapshotJsonWriter_h
16+
#define INCLUDED_ml_api_CModelSnapshotJsonWriter_h
17+
18+
#include <core/CJsonOutputStreamWrapper.h>
19+
#include <core/CRapidJsonConcurrentLineWriter.h>
20+
21+
#include <model/CResourceMonitor.h>
22+
23+
#include <api/ImportExport.h>
24+
25+
#include <string>
26+
27+
namespace ml
28+
{
29+
namespace api
30+
{
31+
32+
//! \brief
33+
//! Write model snapshots in JSON format
34+
//!
35+
//! DESCRIPTION:\n
36+
//! Outputs the model snapshot documents that accompany each state persist.
37+
//!
38+
class API_EXPORT CModelSnapshotJsonWriter
39+
{
40+
public:
41+
//! Structure to store the model snapshot metadata
42+
struct SModelSnapshotReport
43+
{
44+
core_t::TTime s_SnapshotTimestamp;
45+
std::string s_Description;
46+
std::string s_SnapshotId;
47+
size_t s_NumDocs;
48+
model::CResourceMonitor::SResults s_ModelSizeStats;
49+
std::string s_NormalizerState;
50+
core_t::TTime s_LatestRecordTime;
51+
core_t::TTime s_LatestFinalResultTime;
52+
};
53+
54+
public:
55+
//! Constructor that causes output to be written to the specified wrapped stream
56+
CModelSnapshotJsonWriter(const std::string &jobId,
57+
core::CJsonOutputStreamWrapper &strmOut);
58+
59+
//! Writes the given model snapshot in JSON format.
60+
void write(const SModelSnapshotReport &report);
61+
62+
//! Write the quantile's state
63+
static void writeQuantileState(const std::string &jobId,
64+
const std::string &state,
65+
core_t::TTime timestamp,
66+
core::CRapidJsonConcurrentLineWriter &writer);
67+
68+
private:
69+
//! The job ID
70+
std::string m_JobId;
71+
72+
//! JSON line writer
73+
core::CRapidJsonConcurrentLineWriter m_Writer;
74+
};
75+
76+
77+
}
78+
}
79+
80+
#endif // INCLUDED_ml_api_CModelSnapshotJsonWriter_h

0 commit comments

Comments
 (0)