Skip to content

YMQ fixes for 24-3 #9646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ namespace NKikimr::NHttpProxy {
.Counters = nullptr,
.AWSSignature = std::move(HttpContext.GetSignature()),
.IAMToken = HttpContext.IamToken,
.FolderID = ""
.FolderID = HttpContext.FolderId
};

auto authRequestProxy = MakeHolder<NSQS::THttpProxyAuthRequestProxy>(
Expand Down Expand Up @@ -1148,10 +1148,15 @@ namespace NKikimr::NHttpProxy {
SourceAddress = address;
}

DatabasePath = Request->URL;
DatabasePath = Request->URL.Before('?');
if (DatabasePath == "/") {
DatabasePath = "";
}
auto params = TCgiParameters(Request->URL.After('?'));
if (auto it = params.Find("folderId"); it != params.end()) {
FolderId = it->second;
}

//TODO: find out databaseId
ParseHeaders(Request->Headers);
}
Expand Down
82 changes: 69 additions & 13 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,20 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT_VALUES_EQUAL(resultMessage, "The specified queue doesn't exist.");
}

Y_UNIT_TEST_F(TestGetQueueUrlWithIAM, THttpProxyTestMock) {
auto req = CreateSqsGetQueueUrlRequest();
req["QueueName"] = "not-existing-queue";
auto res = SendHttpRequest("/Root?folderId=XXX", "AmazonSQS.GetQueueUrl", std::move(req), "X-YaCloud-SubjectToken: Bearer proxy_sa@builtin");
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);

NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultType = GetByPath<TString>(json, "__type");
UNIT_ASSERT_VALUES_EQUAL(resultType, "AWS.SimpleQueueService.NonExistentQueue");
TString resultMessage = GetByPath<TString>(json, "message");
UNIT_ASSERT_VALUES_EQUAL(resultMessage, "The specified queue doesn't exist.");
}

Y_UNIT_TEST_F(TestSendMessage, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
Expand All @@ -1645,7 +1659,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "SequenceNumber").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
}

Expand All @@ -1666,7 +1680,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {

res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

for (int i = 0; i < 20; ++i) {
Expand Down Expand Up @@ -1698,16 +1712,58 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));

NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"DelaySeconds"};
getQueueAttributes["AttributeNames"] = attributeNames;
{
NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"DelaySeconds"};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
}

{
NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesDelayed",
"ApproximateNumberOfMessagesNotVisible",
"CreatedTimestamp",
"DelaySeconds",
"MaximumMessageSize",
"MessageRetentionPeriod",
"ReceiveMessageWaitTimeSeconds",
"RedrivePolicy",
"VisibilityTimeout",
"FifoQueue",
"ContentBasedDeduplication",
"QueueArn"
};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
}

{
NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"All"};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
}
}

Y_UNIT_TEST_F(TestListQueues, THttpProxyTestMock) {
Expand Down Expand Up @@ -1911,8 +1967,8 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
auto succesful0 = json["Successful"][0];
UNIT_ASSERT(succesful0["Id"] == "Id-0");
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageAttributes").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MD5OfMessageAttributes").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MD5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
}

Expand Down
18 changes: 9 additions & 9 deletions ydb/public/api/protos/draft/ymq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ message GetQueueAttributesResult {
message GetQueueUrlRequest {
Ydb.Operations.OperationParams operation_params = 1;
string queue_name = 2;
optional string queue_owner_aws_account_id = 3;
optional string queue_owner_a_w_s_account_id = 3;
}

message GetQueueUrlResponse {
Expand Down Expand Up @@ -194,8 +194,8 @@ message ReceiveMessageResponse {
message Message {
map<string, string> attributes = 1;
string body = 2;
string md5_of_body = 3;
string md5_of_message_attributes = 4;
string m_d_5_of_body = 3;
string m_d_5_of_message_attributes = 4;
map<string, MessageAttribute> message_attributes = 5;
string message_id = 6;
string receipt_handle = 7;
Expand All @@ -221,9 +221,9 @@ message SendMessageResponse {
}

message SendMessageResult {
string md5_of_message_attributes = 1;
string md5_of_message_body= 2;
string md5_of_message_system_attributes= 3;
string m_d_5_of_message_attributes = 1;
string m_d_5_of_message_body= 2;
string m_d_5_of_message_system_attributes= 3;
string message_id = 4;
string sequence_number = 5;
}
Expand All @@ -248,10 +248,10 @@ message SendMessageBatchRequestEntry {

message SendMessageBatchResultEntry {
string id = 1;
string md5_of_message_body = 2;
string m_d_5_of_message_body = 2;
string message_id = 3;
string md5_of_message_attributes = 4;
string md5_of_message_system_attributes = 5;
string m_d_5_of_message_attributes = 4;
string m_d_5_of_message_system_attributes = 5;
string sequence_number = 6;
}

Expand Down
114 changes: 40 additions & 74 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ namespace NKikimr::NYmq::V1 {

Ydb::Ymq::V1::SendMessageResult GetResult(const NKikimrClient::TSqsResponse& resp) override {
Ydb::Ymq::V1::SendMessageResult result;
result.set_md5_of_message_attributes(GetResponse(resp).GetMD5OfMessageAttributes());
result.set_md5_of_message_body(GetResponse(resp).GetMD5OfMessageBody());
result.set_m_d_5_of_message_attributes(GetResponse(resp).GetMD5OfMessageAttributes());
result.set_m_d_5_of_message_body(GetResponse(resp).GetMD5OfMessageBody());
result.set_message_id(GetResponse(resp).GetMessageId());
result.set_sequence_number(std::to_string(GetResponse(resp).GetSequenceNumber()));
return result;
Expand Down Expand Up @@ -347,8 +347,8 @@ namespace NKikimr::NYmq::V1 {
}

dstMessage.set_body(srcMessage.GetData());
dstMessage.set_md5_of_body(srcMessage.GetMD5OfMessageBody());
dstMessage.set_md5_of_message_attributes(srcMessage.GetMD5OfMessageAttributes());
dstMessage.set_m_d_5_of_body(srcMessage.GetMD5OfMessageBody());
dstMessage.set_m_d_5_of_message_attributes(srcMessage.GetMD5OfMessageAttributes());

for (const auto& srcAttribute: srcMessage.GetMessageAttributes()) {
Ydb::Ymq::V1::MessageAttribute dstAttribute;
Expand Down Expand Up @@ -448,75 +448,41 @@ namespace NKikimr::NYmq::V1 {

Ydb::Ymq::V1::GetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse& resp) override {
Ydb::Ymq::V1::GetQueueAttributesResult result;
for (const auto& attributeName : Attributes) {
if (attributeName == APPROXIMATE_NUMBER_OF_MESSAGES) {
AddAttribute(
result,
APPROXIMATE_NUMBER_OF_MESSAGES,
GetResponse(resp).GetApproximateNumberOfMessages()
);
} else if (attributeName == APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED) {
AddAttribute(
result,
APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED,
GetResponse(resp).GetApproximateNumberOfMessagesDelayed()
);
} else if (attributeName == CREATED_TIMESTAMP) {
AddAttribute(
result,
CREATED_TIMESTAMP,
GetResponse(resp).GetCreatedTimestamp()
);
} else if (attributeName == DELAY_SECONDS) {
AddAttribute(
result,
DELAY_SECONDS,
GetResponse(resp).GetDelaySeconds()
);
} else if (attributeName == LAST_MODIFIED_TIMESTAMP) {
AddAttribute(
result,
LAST_MODIFIED_TIMESTAMP,
GetResponse(resp).GetLastModifiedTimestamp()
);
} else if (attributeName == MAXIMUM_MESSAGE_SIZE) {
AddAttribute(
result,
MAXIMUM_MESSAGE_SIZE,
GetResponse(resp).GetMaximumMessageSize()
);
} else if (attributeName == MESSAGE_RETENTION_PERIOD) {
AddAttribute(
result,
MESSAGE_RETENTION_PERIOD,
GetResponse(resp).GetMessageRetentionPeriod()
);
} else if (attributeName == QUEUE_ARN) {
AddAttribute(
result,
QUEUE_ARN,
GetResponse(resp).GetQueueArn()
);
} else if (attributeName == RECEIVE_MESSAGE_WAIT_TIME_SECONDS) {
AddAttribute(
result,
RECEIVE_MESSAGE_WAIT_TIME_SECONDS,
GetResponse(resp).GetReceiveMessageWaitTimeSeconds()
);
} else if (attributeName == VISIBILITY_TIMEOUT) {
AddAttribute(
result,
VISIBILITY_TIMEOUT,
GetResponse(resp).GetVisibilityTimeout()
);
} else if (attributeName == REDRIVE_POLICY) {
AddAttribute(
result,
REDRIVE_POLICY,
GetResponse(resp).GetRedrivePolicy()
);
}
const auto& attrs = resp.GetGetQueueAttributes();
if (attrs.HasApproximateNumberOfMessages()) {
AddAttribute(result, APPROXIMATE_NUMBER_OF_MESSAGES, attrs.GetApproximateNumberOfMessages());
}
if (attrs.HasApproximateNumberOfMessagesDelayed()) {
AddAttribute(result, APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, attrs.GetApproximateNumberOfMessagesDelayed());
}
if (attrs.HasCreatedTimestamp()) {
AddAttribute(result, CREATED_TIMESTAMP, attrs.GetCreatedTimestamp());
}
if (attrs.HasDelaySeconds()) {
AddAttribute(result, DELAY_SECONDS, attrs.GetDelaySeconds());
}
if (attrs.HasLastModifiedTimestamp()) {
AddAttribute(result, LAST_MODIFIED_TIMESTAMP, attrs.GetLastModifiedTimestamp());
}
if (attrs.HasMaximumMessageSize()) {
AddAttribute(result, MAXIMUM_MESSAGE_SIZE, attrs.GetMaximumMessageSize());
}
if (attrs.HasMessageRetentionPeriod()) {
AddAttribute(result, MESSAGE_RETENTION_PERIOD, attrs.GetMessageRetentionPeriod());
}
if (attrs.HasQueueArn()) {
AddAttribute(result, QUEUE_ARN, attrs.GetQueueArn());
}
if (attrs.HasReceiveMessageWaitTimeSeconds()) {
AddAttribute(result, RECEIVE_MESSAGE_WAIT_TIME_SECONDS, attrs.GetReceiveMessageWaitTimeSeconds());
}
if (attrs.HasVisibilityTimeout()) {
AddAttribute(result, VISIBILITY_TIMEOUT, attrs.GetVisibilityTimeout());
}
if (attrs.HasRedrivePolicy()) {
AddAttribute(result, REDRIVE_POLICY, attrs.GetRedrivePolicy());
}

return result;
}

Expand Down Expand Up @@ -819,8 +785,8 @@ namespace NKikimr::NYmq::V1 {
} else {
auto currentSuccessful = result.Addsuccessful();
currentSuccessful->Setid(entry.GetId());
currentSuccessful->Setmd5_of_message_attributes(entry.GetMD5OfMessageAttributes());
currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody());
currentSuccessful->set_m_d_5_of_message_attributes(entry.GetMD5OfMessageAttributes());
currentSuccessful->set_m_d_5_of_message_body(entry.GetMD5OfMessageBody());
currentSuccessful->Setmessage_id(entry.GetMessageId());
currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber()));
}
Expand Down
Loading