1+ #include " solomon_accessor_client.h"
2+
3+ #include < library/cpp/http/simple/http_client.h>
4+ #include < library/cpp/protobuf/interop/cast.h>
5+ #include < library/cpp/threading/future/core/future.h>
6+ #include < ydb/public/sdk/cpp/src/library/grpc/client/grpc_client_low.h>
7+ #include < yql/essentials/utils/url_builder.h>
8+ #include < yql/essentials/utils/yql_panic.h>
9+
10+ #include < ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.pb.h>
11+ #include < ydb/library/yql/providers/common/solomon_accessor/grpc/solomon_accessor_pb.grpc.pb.h>
12+
13+ namespace NYql ::NSo {
14+
15+ using namespace yandex ::monitoring::api::v3;
16+
17+ namespace {
18+
19+ Downsampling::GapFilling ParseGapFilling (const TString &fill)
20+ {
21+ if (fill.equal (" NULL" )) {
22+ return Downsampling::GAP_FILLING_NULL;
23+ }
24+ if (fill.equal (" NONE" )) {
25+ return Downsampling::GAP_FILLING_NONE;
26+ }
27+ if (fill.equal (" PREVIOUS" )) {
28+ return Downsampling::GAP_FILLING_PREVIOUS;
29+ }
30+ return Downsampling::GAP_FILLING_UNSPECIFIED;
31+ }
32+
33+ Downsampling::GridAggregation ParseGridAggregation (const TString &aggregation)
34+ {
35+ if (aggregation.equal (" MAX" )) {
36+ return Downsampling::GRID_AGGREGATION_MAX;
37+ }
38+ if (aggregation.equal (" MIN" )) {
39+ return Downsampling::GRID_AGGREGATION_MIN;
40+ }
41+ if (aggregation.equal (" SUM" )) {
42+ return Downsampling::GRID_AGGREGATION_SUM;
43+ }
44+ if (aggregation.equal (" AVG" )) {
45+ return Downsampling::GRID_AGGREGATION_AVG;
46+ }
47+ if (aggregation.equal (" LAST" )) {
48+ return Downsampling::GRID_AGGREGATION_LAST;
49+ }
50+ if (aggregation.equal (" COUNT" )) {
51+ return Downsampling::GRID_AGGREGATION_COUNT;
52+ }
53+ return Downsampling::GRID_AGGREGATION_UNSPECIFIED;
54+ }
55+
56+ MetricType ParseMetricType (const TString &type)
57+ {
58+ if (type.equal (" DGAUGE" )) {
59+ return MetricType::DGAUGE;
60+ }
61+ if (type.equal (" IGAUGE" )) {
62+ return MetricType::IGAUGE;
63+ }
64+ if (type.equal (" COUNTER" )) {
65+ return MetricType::COUNTER;
66+ }
67+ if (type.equal (" RATE" )) {
68+ return MetricType::RATE;
69+ }
70+ return MetricType::METRIC_TYPE_UNSPECIFIED;
71+ }
72+
73+ class TSolomonAccessorClient : public ISolomonAccessorClient , public std ::enable_shared_from_this<TSolomonAccessorClient>
74+ {
75+ public:
76+ using THeaders = TKeepAliveHttpClient::THeaders;
77+ using THttpCode = TKeepAliveHttpClient::THttpCode;
78+
79+ TSolomonAccessorClient (
80+ NYql::NSo::NProto::TDqSolomonSource&& settings,
81+ std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider
82+ )
83+ : DefaultReplica(" sas" )
84+ , DefaultPort(443 )
85+ , Settings(std::move(settings))
86+ , CredentialsProvider(credentialsProvider)
87+ {}
88+
89+ public:
90+ TMaybe<TString> ListMetrics (
91+ const TString &selectors,
92+ int pageSize,
93+ int page,
94+ TVector<Metric> &result) override final
95+ {
96+ const auto request = BuildListMetricsRequest (selectors, pageSize, page);
97+
98+ THeaders headers;
99+ headers[" Authorization" ] = GetAuthInfo ();
100+
101+ TStringStream str;
102+ const auto httpClient = std::make_unique<TKeepAliveHttpClient>(GetHttpSolomonEndpoint (), DefaultPort);
103+ const auto retCode = httpClient->DoGet (request, &str, headers);
104+
105+ return ProcessHttpResponse (retCode, str.Str (), result);
106+ }
107+
108+ TMaybe<TString> GetData (
109+ const std::vector<TString> &selectors,
110+ std::vector<Timeseries> &results) override final
111+ {
112+ const auto request = BuildGetDataRequest (selectors);
113+
114+ NYdbGrpc::TCallMeta callMeta;
115+ callMeta.Aux .emplace_back (" authorization" , GetAuthInfo ());
116+
117+ auto resultPromise = NThreading::NewPromise<TMaybe<TString>>();
118+
119+ NYdbGrpc::TGRpcClientConfig grpcConf;
120+ grpcConf.Locator = GetGrpcSolomonEndpoint ();
121+ grpcConf.EnableSsl = Settings.GetUseSsl ();
122+ const auto grpcClient = std::make_unique<NYdbGrpc::TGRpcClientLow>();
123+ const auto connection = grpcClient->CreateGRpcServiceConnection <DataService>(grpcConf);
124+
125+ auto context = grpcClient->CreateContext ();
126+ if (!context) {
127+ throw yexception () << " Client is being shutted down" ;
128+ }
129+ std::weak_ptr<const TSolomonAccessorClient> weakSelf = shared_from_this ();
130+ // hold context until reply
131+ auto cb = [weakSelf, resultPromise, context, &results](
132+ NYdbGrpc::TGrpcStatus&& status,
133+ ReadResponse&& result) mutable
134+ {
135+ if (auto self = weakSelf.lock ()) {
136+ resultPromise.SetValue (self->ProcessGrpcResponse (std::move (status), std::move (result), results));
137+ }
138+ resultPromise.SetValue ({});
139+ };
140+
141+ connection->DoRequest <ReadRequest, ReadResponse>(
142+ std::move (request),
143+ std::move (cb),
144+ &DataService::Stub::AsyncRead,
145+ callMeta,
146+ context.get ()
147+ );
148+
149+ resultPromise.GetFuture ().Wait (TDuration::Seconds (10 ));
150+ return resultPromise.GetValue ();
151+ }
152+
153+ private:
154+ TString GetAuthInfo () const
155+ {
156+ const TString authToken = CredentialsProvider->GetAuthInfo ();
157+
158+ switch (Settings.GetClusterType ()) {
159+ case NSo::NProto::ESolomonClusterType::CT_SOLOMON:
160+ return " OAuth " + authToken;
161+ case NSo::NProto::ESolomonClusterType::CT_MONITORING:
162+ return " Bearer " + authToken;
163+ default :
164+ Y_ENSURE (false , " Invalid cluster type " << ToString<ui32>(Settings.GetClusterType ()));
165+ }
166+ }
167+
168+ TString GetHttpSolomonEndpoint () const
169+ {
170+ return (Settings.GetUseSsl () ? " https://" : " http://" ) + Settings.GetEndpoint ();
171+ }
172+
173+ TString GetGrpcSolomonEndpoint () const
174+ {
175+ return Settings.GetEndpoint () + std::to_string (DefaultPort);
176+ }
177+
178+ TString BuildListMetricsRequest (
179+ const TString &selectors,
180+ int pageSize,
181+ int page) const
182+ {
183+ TUrlBuilder builder (" " );
184+
185+ builder.AddPathComponent (" api" );
186+ builder.AddPathComponent (" v2" );
187+ builder.AddPathComponent (" projects" );
188+ builder.AddPathComponent (Settings.GetProject ());
189+ builder.AddPathComponent (" sensors" );
190+
191+ builder.AddUrlParam (" selectors" , selectors);
192+ builder.AddUrlParam (" forceCluster" , DefaultReplica);
193+ builder.AddUrlParam (" pageSize" , std::to_string (pageSize));
194+ builder.AddUrlParam (" page" , std::to_string (page));
195+
196+ return builder.Build ();
197+ }
198+
199+ ReadRequest BuildGetDataRequest (
200+ const std::vector<TString> &selectors) const
201+ {
202+ ReadRequest request;
203+
204+ request.mutable_container ()->set_project_id (Settings.GetProject ());
205+ *request.mutable_from_time () = NProtoInterop::CastToProto (TInstant::FromValue (Settings.GetFrom ()));
206+ *request.mutable_to_time () = NProtoInterop::CastToProto (TInstant::FromValue (Settings.GetTo ()));
207+ *request.mutable_force_replica () = DefaultReplica;
208+
209+ if (Settings.GetDownsampling ().GetDisabled ()) {
210+ request.mutable_downsampling ()->set_disabled (true );
211+ }
212+ else {
213+ const auto downsampling = Settings.GetDownsampling ();
214+ request.mutable_downsampling ()->set_grid_interval (downsampling.GetGridMs ());
215+ request.mutable_downsampling ()->set_grid_aggregation (ParseGridAggregation (downsampling.GetAggregation ()));
216+ request.mutable_downsampling ()->set_gap_filling (ParseGapFilling (downsampling.GetFill ()));
217+ }
218+
219+ for (const auto &metric : selectors) {
220+ auto query = request.mutable_queries ()->Add ();
221+ *query->mutable_value () = TStringBuilder{} << " {" << metric << " }" ;
222+ query->set_hidden (false );
223+ }
224+
225+ return request;
226+ }
227+
228+ TMaybe<TString> ProcessHttpResponse (
229+ THttpCode retCode,
230+ TString response,
231+ TVector<Metric> &result) const
232+ {
233+ if (retCode < 200 || retCode >= 300 ) {
234+ return TStringBuilder{} << " Error while sending request to monitoring api: " << response;
235+ }
236+
237+ NJson::TJsonValue json;
238+ try {
239+ NJson::ReadJsonTree (response, &json, /* throwOnError*/ true );
240+ } catch (const std::exception& e) {
241+ return TStringBuilder{} << " Failed to parse response from monitoring api: " << e.what ();
242+ }
243+
244+ if (!json.IsMap () || !json.Has (" result" )) {
245+ return " Invalid result from monitoring api" ;
246+ }
247+
248+ for (const auto &metricObj : json[" result" ].GetArray ()) {
249+ try {
250+ result.emplace_back (metricObj);
251+ } catch (const std::exception& e) {
252+ return TStringBuilder{} << " Failed to parse response from monitoring: " << e.what ();
253+ }
254+ }
255+
256+ return {};
257+ }
258+
259+ TMaybe<TString> ProcessGrpcResponse (
260+ NYdbGrpc::TGrpcStatus&& status,
261+ ReadResponse &&response,
262+ std::vector<Timeseries> &result) const
263+ {
264+ if (!status.Ok ()) {
265+ return TStringBuilder{} << " Error while sending request to monitoring api: " << status.Msg ;
266+ }
267+
268+ for (const auto &responseValue : response.response_per_query ()) {
269+ YQL_ENSURE (responseValue.has_timeseries_vector ());
270+ YQL_ENSURE (responseValue.timeseries_vector ().values_size () == 1 ); // one response per one set of selectors
271+
272+ const auto &queryResponse = responseValue.timeseries_vector ().values ()[0 ];
273+
274+ std::vector<int64_t > timestampValues;
275+ std::vector<double > timeseriesValues;
276+
277+ for (int64_t value : queryResponse.timestamp_values ().values ()) {
278+ timestampValues.push_back (value);
279+ }
280+ for (double value : queryResponse.double_values ().values ()) {
281+ timeseriesValues.push_back (value);
282+ }
283+
284+ result.emplace_back (queryResponse.name (), queryResponse.type (), std::move (timestampValues), std::move (timeseriesValues));
285+ }
286+
287+ return {};
288+ }
289+
290+ private:
291+ const TString DefaultReplica;
292+ const int DefaultPort;
293+
294+ NYql::NSo::NProto::TDqSolomonSource Settings;
295+ std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
296+ };
297+
298+ } // namespace
299+
300+ Metric::Metric (const NJson::TJsonValue &value)
301+ {
302+ YQL_ENSURE (value.IsMap ());
303+
304+ if (value.Has (" labels" )) {
305+ auto labels = value[" labels" ];
306+ YQL_ENSURE (labels.IsMap ());
307+
308+ for (const auto &[key, value] : labels.GetMapSafe ()) {
309+ YQL_ENSURE (value.IsString ());
310+ Labels[key] = value.GetString ();
311+ }
312+ }
313+
314+ if (value.Has (" type" )) {
315+ YQL_ENSURE (value[" type" ].IsString ());
316+ Type = ParseMetricType (value[" type" ].GetString ());
317+ }
318+
319+ if (value.Has (" createdAt" )) {
320+ YQL_ENSURE (value[" createdAt" ].IsString ());
321+ CreatedAt = value[" createdAt" ].GetString ();
322+ }
323+ }
324+
325+ ISolomonAccessorClient::TPtr
326+ ISolomonAccessorClient::Make (
327+ NYql::NSo::NProto::TDqSolomonSource&& settings,
328+ std::shared_ptr<NYdb::ICredentialsProvider> credentialsProvider)
329+ {
330+ return std::make_shared<TSolomonAccessorClient>(std::move (settings), credentialsProvider);
331+ }
332+
333+ } // namespace NYql::NSo
0 commit comments