Skip to content

Commit 030b601

Browse files
authored
YMQ JSON API: send metering events (24-3) (#12635)
1 parent df8f59f commit 030b601

File tree

5 files changed

+116
-10
lines changed

5 files changed

+116
-10
lines changed

ydb/core/http_proxy/http_req.cpp

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include <ydb/library/folder_service/events.h>
6363

6464
#include <ydb/core/ymq/actor/auth_multi_factory.h>
65+
#include <ydb/core/ymq/actor/serviceid.h>
6566

6667
#include <ydb/library/http_proxy/error/error.h>
6768

@@ -438,8 +439,8 @@ namespace NKikimr::NHttpProxy {
438439
<< " CloudId: " << ev->Get()->CloudId
439440
<< " UserSid: " << ev->Get()->Sid;
440441
);
441-
FolderId = ev->Get()->FolderId;
442-
CloudId = ev->Get()->CloudId;
442+
HttpContext.FolderId = FolderId = ev->Get()->FolderId;
443+
HttpContext.CloudId = CloudId = ev->Get()->CloudId;
443444
UserSid = ev->Get()->Sid;
444445
SendGrpcRequestNoDriver(ctx);
445446
} else {
@@ -473,7 +474,8 @@ namespace NKikimr::NHttpProxy {
473474
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
474475
}
475476
CloudId = cloudIdAndResourceId.first;
476-
ResourceId = cloudIdAndResourceId.second;
477+
HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second;
478+
HttpContext.ResponseData.YmqIsFifo = queueUrl.EndsWith(".fifo");
477479
}
478480
} catch (const NKikimr::NSQS::TSQSException& e) {
479481
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
@@ -1248,6 +1250,38 @@ namespace NKikimr::NHttpProxy {
12481250
ResponseData.DumpBody(ContentType)
12491251
);
12501252

1253+
if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) {
1254+
// Send request attributes to the metering actor
1255+
auto reportRequestAttributes = MakeHolder<NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>();
1256+
1257+
auto& requestAttributes = reportRequestAttributes->Data;
1258+
1259+
requestAttributes.HttpStatusCode = httpCode;
1260+
requestAttributes.IsFifo = ResponseData.YmqIsFifo;
1261+
requestAttributes.FolderId = FolderId;
1262+
requestAttributes.RequestSizeInBytes = Request->Size();
1263+
requestAttributes.ResponseSizeInBytes = response->Size();
1264+
requestAttributes.SourceAddress = SourceAddress;
1265+
requestAttributes.ResourceId = ResourceId;
1266+
requestAttributes.Action = NSQS::ActionFromString(MethodName);
1267+
1268+
LOG_SP_DEBUG_S(
1269+
ctx,
1270+
NKikimrServices::HTTP_PROXY,
1271+
TStringBuilder() << "Send metering event."
1272+
<< " HttpStatusCode: " << requestAttributes.HttpStatusCode
1273+
<< " IsFifo: " << requestAttributes.IsFifo
1274+
<< " FolderId: " << requestAttributes.FolderId
1275+
<< " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
1276+
<< " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
1277+
<< " SourceAddress: " << requestAttributes.SourceAddress
1278+
<< " ResourceId: " << requestAttributes.ResourceId
1279+
<< " Action: " << requestAttributes.Action
1280+
);
1281+
1282+
ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
1283+
}
1284+
12511285
ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
12521286
}
12531287

ydb/core/http_proxy/http_req.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ struct THttpResponseData {
5858
TString ErrorText{"OK"};
5959
TString YmqStatusCode;
6060
ui32 YmqHttpCode;
61+
bool YmqIsFifo;
6162

6263
TString DumpBody(MimeTypes contentType);
6364
};
@@ -83,6 +84,7 @@ struct THttpRequestContext {
8384
TString FolderId; // not in context
8485
TString CloudId; // not in context
8586
TString StreamName; // not in context
87+
TString ResourceId;
8688
TString SourceAddress;
8789
TString MethodName; // used once
8890
TString ApiVersion; // used once

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/library/grpc/server/actors/logger.h>
66
#include <library/cpp/http/misc/parsed_request.h>
77
#include <library/cpp/json/json_writer.h>
8+
#include <library/cpp/logger/global/global.h>
89
#include <library/cpp/resource/resource.h>
910
#include <library/cpp/testing/unittest/registar.h>
1011

@@ -81,10 +82,10 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
8182
InitAll();
8283
}
8384

84-
void InitAll(bool yandexCloudMode = true) {
85+
void InitAll(bool yandexCloudMode = true, bool enableMetering = false) {
8586
AccessServicePort = PortManager.GetPort(8443);
8687
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
87-
InitKikimr(yandexCloudMode);
88+
InitKikimr(yandexCloudMode, enableMetering);
8889
InitAccessServiceService();
8990
InitHttpServer(yandexCloudMode);
9091
}
@@ -365,7 +366,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
365366
return resultSet;
366367
}
367368

368-
void InitKikimr(bool yandexCloudMode) {
369+
void InitKikimr(bool yandexCloudMode, bool enableMetering) {
369370
AuthFactory = std::make_shared<TIamAuthFactory>();
370371
NKikimrConfig::TAppConfig appConfig;
371372
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
@@ -379,6 +380,21 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
379380
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
380381
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);
381382

383+
if (enableMetering) {
384+
auto& sqsConfig = *appConfig.MutableSqsConfig();
385+
386+
sqsConfig.SetMeteringFlushingIntervalMs(100);
387+
sqsConfig.SetMeteringLogFilePath("sqs_metering.log");
388+
TFsPath(sqsConfig.GetMeteringLogFilePath()).DeleteIfExists();
389+
390+
sqsConfig.AddMeteringCloudNetCidr("5.45.196.0/24");
391+
sqsConfig.AddMeteringCloudNetCidr("2a0d:d6c0::/29");
392+
sqsConfig.AddMeteringYandexNetCidr("127.0.0.0/8");
393+
sqsConfig.AddMeteringYandexNetCidr("5.45.217.0/24");
394+
395+
DoInitGlobalLog(CreateOwningThreadedLogBackend(sqsConfig.GetMeteringLogFilePath(), 0));
396+
}
397+
382398
auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
383399
limit->SetMinPeriodSeconds(0);
384400
limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds());
@@ -414,6 +430,13 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
414430
ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG);
415431
ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);
416432

433+
if (enableMetering) {
434+
ActorRuntime->RegisterService(
435+
NSQS::MakeSqsMeteringServiceID(),
436+
ActorRuntime->Register(NSQS::CreateSqsMeteringService())
437+
);
438+
}
439+
417440
NYdb::TClient client(*(KikimrServer->ServerSettings));
418441
UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
419442
client.AlterUserAttributes("/", "Root", {{"folder_id", "folder4"},
@@ -477,7 +500,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
477500
);
478501

479502
client.MkDir("/Root/SQS", ".FIFO");
480-
client.CreateTable("/Root/SQS/.FIFO",
503+
client.CreateTable("/Root/SQS/.FIFO",
481504
"Name: \"Messages\""
482505
"Columns { Name: \"QueueIdNumberHash\" Type: \"Uint64\"}"
483506
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
@@ -537,7 +560,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
537560
"KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]"
538561
);
539562

540-
auto stateTableCommon =
563+
auto stateTableCommon =
541564
"Name: \"State\""
542565
"Columns { Name: \"QueueIdNumberHash\" Type: \"Uint64\"}"
543566
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
@@ -581,7 +604,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
581604
"KeyColumnNames: [\"QueueIdNumberAndShardHash\", \"QueueIdNumber\", \"Shard\", \"Offset\"]"
582605
);
583606

584-
auto sentTimestampIdxCommonColumns=
607+
auto sentTimestampIdxCommonColumns=
585608
"Columns { Name: \"QueueIdNumberAndShardHash\" Type: \"Uint64\"}"
586609
"Columns { Name: \"QueueIdNumber\" Type: \"Uint64\"}"
587610
"Columns { Name: \"Shard\" Type: \"Uint32\"}"
@@ -700,7 +723,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
700723
folderServiceConfig.SetEnable(false);
701724
actorId = as->Register(NKikimr::NFolderService::CreateFolderServiceActor(folderServiceConfig, "cloud4"));
702725
as->RegisterLocalService(NFolderService::FolderServiceActorId(), actorId);
703-
726+
704727
actorId = as->Register(NKikimr::NFolderService::CreateFolderServiceActor(folderServiceConfig, "cloud4"));
705728
as->RegisterLocalService(NSQS::MakeSqsFolderServiceID(), actorId);
706729

@@ -766,3 +789,9 @@ class THttpProxyTestMockForSQS : public THttpProxyTestMock {
766789
InitAll(false);
767790
}
768791
};
792+
793+
class THttpProxyTestMockWithMetering : public THttpProxyTestMock {
794+
void SetUp(NUnitTest::TTestContext&) override {
795+
InitAll(true, true);
796+
}
797+
};

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,6 +1663,35 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16631663
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
16641664
}
16651665

1666+
Y_UNIT_TEST_F(BillingRecordsForJsonApi, THttpProxyTestMockWithMetering) {
1667+
auto createQueueReq = CreateSqsCreateQueueRequest();
1668+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1669+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1670+
NJson::TJsonValue json;
1671+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1672+
TString queueUrl = GetByPath<TString>(json, "QueueUrl");
1673+
UNIT_ASSERT(queueUrl.EndsWith("ExampleQueueName"));
1674+
1675+
NJson::TJsonValue sendMessageReq;
1676+
sendMessageReq["QueueUrl"] = queueUrl;
1677+
auto body = "MessageBody-0";
1678+
sendMessageReq["MessageBody"] = body;
1679+
sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1680+
sendMessageReq["MessageGroupId"] = "MessageGroupId-0";
1681+
1682+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
1683+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1684+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1685+
UNIT_ASSERT(!GetByPath<TString>(json, "SequenceNumber").empty());
1686+
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
1687+
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
1688+
1689+
// TODO:
1690+
// Sleep(TDuration::Seconds(500));
1691+
// TVector<NSc::TValue> records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath());
1692+
// CheckBillingRecord(records, expectedRecords);
1693+
}
1694+
16661695
Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
16671696
NJson::TJsonValue sendMessageReq;
16681697
sendMessageReq["QueueUrl"] = "";

ydb/core/ymq/http/http.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,18 @@ void THttpRequest::WriteResponse(const TReplyParams& replyParams, const TSqsHttp
164164
requestAttributes.ResourceId = response.ResourceId;
165165
requestAttributes.Action = Action_;
166166

167+
RLOG_SQS_BASE_DEBUG(*Parent_->ActorSystem_,
168+
TStringBuilder() << "Send metering event."
169+
<< " HttpStatusCode: " << requestAttributes.HttpStatusCode
170+
<< " IsFifo: " << requestAttributes.IsFifo
171+
<< " FolderId: " << requestAttributes.FolderId
172+
<< " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes
173+
<< " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes
174+
<< " SourceAddress: " << requestAttributes.SourceAddress
175+
<< " ResourceId: " << requestAttributes.ResourceId
176+
<< " Action: " << requestAttributes.Action
177+
);
178+
167179
Parent_->ActorSystem_->Send(MakeSqsMeteringServiceID(), reportRequestAttributes.Release());
168180
}
169181

0 commit comments

Comments
 (0)