Skip to content

Commit b1871aa

Browse files
authored
[6.x][ML] Store expanding window bucket values in a compressed format (#104)
1 parent 13aee89 commit b1871aa

22 files changed

+893
-347
lines changed

docs/CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Improve robustness w.r.t. outliers of detection and initialisation of seasonal c
2020
Improve behavior when there are abrupt changes in the seasonal components present in a time series ({pull}91[#91])
2121
Explicit change point detection and modelling ({pull}92[#92])
2222
Improve partition analysis memory usage ({pull}97[#97])
23+
Reduce model memory by storing state for periodicity testing in a compressed format ({pull}100[#100])
2324

2425
Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing
2526
model state on disk ({pull}89[#89])

include/core/CCompressUtils.h

Lines changed: 0 additions & 92 deletions
This file was deleted.

include/core/CStringSimilarityTester.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
#ifndef INCLUDED_ml_core_CStringSimilarityTester_h
77
#define INCLUDED_ml_core_CStringSimilarityTester_h
88

9-
#include <core/CCompressUtils.h>
109
#include <core/CLogger.h>
1110
#include <core/CNonCopyable.h>
11+
#include <core/CompressUtils.h>
1212
#include <core/ImportExport.h>
1313

1414
#include <boost/scoped_array.hpp>
@@ -440,7 +440,7 @@ class CORE_EXPORT CStringSimilarityTester : private CNonCopyable {
440440
static const int MINUS_INFINITE_INT;
441441

442442
//! Used by the compression-based similarity measures
443-
mutable CCompressUtils m_Compressor;
443+
mutable CDeflator m_Compressor;
444444

445445
// For unit testing
446446
friend class ::CStringSimilarityTesterTest;

include/core/CompressUtils.h

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
#ifndef INCLUDED_ml_core_CCompressUtils_h
7+
#define INCLUDED_ml_core_CCompressUtils_h
8+
9+
#include <core/CNonCopyable.h>
10+
#include <core/ImportExport.h>
11+
12+
#include <zlib.h>
13+
14+
#include <string>
15+
#include <vector>
16+
17+
namespace ml {
18+
namespace core {
19+
20+
//! \brief
21+
//! Shrink wrap zlib calls.
22+
//!
23+
//! DESCRIPTION:\n
24+
//! Shrink wrap zlib calls.
25+
//!
26+
//! IMPLEMENTATION DECISIONS:\n
27+
//! Data can be added incrementally and then 'finished' to
28+
//! complete deflation or inflation.
29+
//!
30+
//! This object retains in memory the entire compressed state
31+
//! so it not good for file read/write.
32+
//!
33+
//! A single Z stream is used for the lifetime of the object,
34+
//! so each object can only work on one task at a time. In
35+
//! a multi-threaded application it would be best to create
36+
//! one object for each thread.
37+
//!
38+
class CORE_EXPORT CCompressUtil : private CNonCopyable {
39+
public:
40+
using TByteVec = std::vector<Bytef>;
41+
42+
public:
43+
CCompressUtil(bool lengthOnly);
44+
virtual ~CCompressUtil() = default;
45+
46+
//! Add a string.
47+
//!
48+
//! \note Multiple calls to this function without finishing
49+
//! are equivalent to deflating or inflating the concatenation
50+
//! of the strings passed in the order they are passed.
51+
bool addString(const std::string& input);
52+
53+
//! Add a vector of trivially copyable types.
54+
//!
55+
//! \note Multiple calls to this function without finishing
56+
//! are equivalent to deflating or inflating the concatenation
57+
//! of the vectors passed in the order they are passed.
58+
template<typename T>
59+
bool addVector(const std::vector<T>& input) {
60+
static_assert(std::is_trivially_copyable<T>::value, "Type must be trivially copyable");
61+
if (input.empty()) {
62+
return true;
63+
}
64+
if (m_State == E_Finished) {
65+
// If the last round of data processing has finished
66+
// and we're adding a new vector then we need to reset
67+
// the stream so that a new round starts from scratch.
68+
this->reset();
69+
}
70+
return this->processInput(false, input);
71+
}
72+
73+
//! Get transformed representation.
74+
//!
75+
//! \warning This will fail if the lengthOnly constructor argument
76+
//! was set to true.
77+
//!
78+
//! \note The output representation is a byte array NOT a string,
79+
//! and hence not printable.
80+
//!
81+
//! If finish==false then retrieve partial state.
82+
bool data(bool finish, TByteVec& result);
83+
84+
//! Get transformed representation.
85+
//!
86+
//! \note This is equivalent to calling data with finish==true, but
87+
//! also takes the cached state (avoiding the copy).
88+
bool finishAndTakeData(TByteVec& result);
89+
90+
//! Get transformed data length.
91+
//!
92+
//! If finish==false then retrieve partial length.
93+
bool length(bool finish, std::size_t& length);
94+
95+
//! Reset the underlying stream. This will happen automatically
96+
//! when adding a new string after having finished the previous
97+
//! round, but sometimes, for example when recovering from an
98+
//! error, it may be desirable to explicitly reset the state.
99+
void reset();
100+
101+
protected:
102+
//! Get the underlying stream.
103+
z_stream& stream();
104+
105+
private:
106+
enum EState { E_Unused, E_Active, E_Finished };
107+
108+
private:
109+
static const std::size_t CHUNK_SIZE{4096};
110+
111+
private:
112+
//! Get an unsigned character pointer to the address of the start
113+
//! of the vector data.
114+
template<typename T>
115+
static Bytef* bytes(const std::vector<T>& input) {
116+
return const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input.data()));
117+
}
118+
119+
//! Get an unsigned character pointer to the address of the start
120+
//! of the string character array.
121+
static Bytef* bytes(const std::string& input) {
122+
return reinterpret_cast<Bytef*>(const_cast<char*>(input.data()));
123+
}
124+
125+
//! Get the vector data size in bytes.
126+
template<typename T>
127+
static uInt size(const std::vector<T>& input) {
128+
return static_cast<uInt>(input.size() * sizeof(T));
129+
}
130+
131+
//! Get the string size in bytes.
132+
static uInt size(const std::string& input) {
133+
return static_cast<uInt>(input.size());
134+
}
135+
136+
//! Process a chunk of state (optionally flushing).
137+
bool processChunk(int flush);
138+
139+
//! Process the input \p input in chunks.
140+
template<typename T>
141+
bool processInput(bool finish, const T& input) {
142+
if (input.empty() && m_State == E_Active && !finish) {
143+
return true;
144+
}
145+
146+
m_State = E_Active;
147+
148+
m_ZlibStrm.next_in = bytes(input);
149+
m_ZlibStrm.avail_in = size(input);
150+
151+
int flush{finish ? Z_FINISH : Z_NO_FLUSH};
152+
do {
153+
if (this->processChunk(flush) == false) {
154+
return false;
155+
}
156+
} while (m_ZlibStrm.avail_out == 0);
157+
158+
m_State = finish ? E_Finished : E_Active;
159+
160+
return true;
161+
}
162+
163+
//! Preparation before returning any data.
164+
bool prepareToReturnData(bool finish);
165+
166+
//! Process a chunk with the stream.
167+
virtual int streamProcessChunk(int flush) = 0;
168+
169+
//! Reset the underlying stream.
170+
virtual int resetStream() = 0;
171+
172+
private:
173+
//! The current state of deflation or inflation.
174+
EState m_State;
175+
176+
//! Is this object only fit for getting output lengths?
177+
bool m_LengthOnly;
178+
179+
//! The buffer for a chunk of output from (de|in)flation.
180+
Bytef m_Chunk[CHUNK_SIZE];
181+
182+
//! The output buffer when the compressed result is being
183+
//! stored.
184+
TByteVec m_FullResult;
185+
186+
//! The zlib data structure.
187+
z_stream m_ZlibStrm;
188+
};
189+
190+
//! \brief Implementation of CompressUtil for deflating data.
191+
class CORE_EXPORT CDeflator final : public CCompressUtil {
192+
public:
193+
CDeflator(bool lengthOnly, int level = Z_DEFAULT_COMPRESSION);
194+
~CDeflator();
195+
196+
private:
197+
//! Process a chunk of state (optionally flushing).
198+
virtual int streamProcessChunk(int flush);
199+
//! Reset the underlying stream.
200+
virtual int resetStream();
201+
};
202+
203+
//! \brief Implementation of CompressUtil for inflating data.
204+
class CORE_EXPORT CInflator final : public CCompressUtil {
205+
public:
206+
CInflator(bool lengthOnly);
207+
~CInflator();
208+
209+
private:
210+
//! Process a chunk of state (optionally flushing).
211+
virtual int streamProcessChunk(int flush);
212+
//! Reset the underlying stream.
213+
virtual int resetStream();
214+
};
215+
}
216+
}
217+
218+
#endif // INCLUDED_ml_core_CCompressUtils_h

0 commit comments

Comments
 (0)