Skip to content

Commit 6b5b451

Browse files
authored
YMQ: fix ReceiveMessage with attributes (for stable-24-3) (#10144)
1 parent aa99798 commit 6b5b451

File tree

2 files changed

+115
-37
lines changed

2 files changed

+115
-37
lines changed

ydb/core/http_proxy/ut/http_proxy_ut.h

+92-11
Original file line numberDiff line numberDiff line change
@@ -1665,7 +1665,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16651665

16661666
Y_UNIT_TEST_F(TestReceiveMessage, THttpProxyTestMock) {
16671667
auto createQueueReq = CreateSqsCreateQueueRequest();
1668-
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
1668+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));
16691669
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
16701670
NJson::TJsonValue json;
16711671
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
@@ -1674,29 +1674,110 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16741674

16751675
NJson::TJsonValue sendMessageReq;
16761676
sendMessageReq["QueueUrl"] = resultQueueUrl;
1677-
auto body = "MessageBody-0";
1678-
sendMessageReq["MessageBody"] = body;
1679-
sendMessageReq["MessageBody"] = body;
1677+
auto body0 = "MessageBody-0";
1678+
sendMessageReq["MessageBody"] = body0;
16801679

1681-
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
1680+
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", sendMessageReq, FormAuthorizationStr("ru-central1"));
16821681
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
16831682
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
16841683
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
16851684

1685+
NJson::TJsonValue receiveMessageReq;
1686+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
16861687
for (int i = 0; i < 20; ++i) {
1687-
NJson::TJsonValue receiveMessageReq;
1688-
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1689-
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
1690-
if (res.Body != TString("{}")) {
1691-
break;;
1688+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", receiveMessageReq, FormAuthorizationStr("ru-central1"));
1689+
if (res.Body != "{}") {
1690+
break;
16921691
}
16931692
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
16941693
}
16951694

16961695
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
16971696
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
16981697
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);
1699-
UNIT_ASSERT_VALUES_EQUAL(json["Messages"][0]["Body"], body);
1698+
UNIT_ASSERT_VALUES_EQUAL(json["Messages"][0]["Body"], body0);
1699+
}
1700+
1701+
Y_UNIT_TEST_F(TestReceiveMessageWithAttributes, THttpProxyTestMock) {
1702+
// Test if we process AttributeNames, MessageSystemAttributeNames, MessageAttributeNames correctly.
1703+
1704+
auto createQueueReq = CreateSqsCreateQueueRequest();
1705+
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));
1706+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1707+
NJson::TJsonValue json;
1708+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1709+
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
1710+
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));
1711+
1712+
auto sendMessage = [this, resultQueueUrl](const TString& body) {
1713+
NJson::TJsonValue sendMessageReq;
1714+
sendMessageReq["QueueUrl"] = resultQueueUrl;
1715+
sendMessageReq["MessageBody"] = body;
1716+
1717+
auto res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", sendMessageReq, FormAuthorizationStr("ru-central1"));
1718+
NJson::TJsonValue json;
1719+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1720+
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
1721+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1722+
};
1723+
1724+
TString body = "MessageBody-0";
1725+
sendMessage(body);
1726+
1727+
auto receiveMessage = [this](NJson::TJsonValue request, const TString& expectedBody) -> NJson::TJsonValue {
1728+
request["VisibilityTimeout"] = 0; // Keep the message visible for next ReceiveMessage requests.
1729+
THttpResult res;
1730+
for (int i = 0; i < 20; ++i) {
1731+
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", request, FormAuthorizationStr("ru-central1"));
1732+
if (res.Body != "{}") {
1733+
break;
1734+
}
1735+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1736+
}
1737+
1738+
NJson::TJsonValue json;
1739+
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
1740+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
1741+
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);
1742+
UNIT_ASSERT_VALUES_EQUAL(json["Messages"][0]["Body"], expectedBody);
1743+
return json;
1744+
};
1745+
1746+
{
1747+
// Request SentTimestamp message system attribute using deprecated AttributeNames field.
1748+
NJson::TJsonValue receiveMessageReq;
1749+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1750+
receiveMessageReq["AttributeNames"] = NJson::TJsonArray{"SentTimestamp"};
1751+
json = receiveMessage(receiveMessageReq, body);
1752+
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
1753+
}
1754+
1755+
{
1756+
// Request SentTimestamp message system attribute using MessageSystemAttributeNames field.
1757+
NJson::TJsonValue receiveMessageReq;
1758+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1759+
receiveMessageReq["MessageSystemAttributeNames"] = NJson::TJsonArray{"SentTimestamp"};
1760+
json = receiveMessage(receiveMessageReq, body);
1761+
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
1762+
}
1763+
1764+
{
1765+
// Request All message system attributes using deprecated AttributeNames field.
1766+
NJson::TJsonValue receiveMessageReq;
1767+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1768+
receiveMessageReq["AttributeNames"] = NJson::TJsonArray{"All"};
1769+
json = receiveMessage(receiveMessageReq, body);
1770+
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
1771+
}
1772+
1773+
{
1774+
// Request All message system attributes using MessageSystemAttributeNames field.
1775+
NJson::TJsonValue receiveMessageReq;
1776+
receiveMessageReq["QueueUrl"] = resultQueueUrl;
1777+
receiveMessageReq["MessageSystemAttributeNames"] = NJson::TJsonArray{"All"};
1778+
json = receiveMessage(receiveMessageReq, body);
1779+
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
1780+
}
17001781
}
17011782

17021783
Y_UNIT_TEST_F(TestGetQueueAttributes, THttpProxyTestMock) {

ydb/services/ymq/ymq_proxy.cpp

+23-26
Original file line numberDiff line numberDiff line change
@@ -321,29 +321,26 @@ namespace NKikimr::NYmq::V1 {
321321
for (const auto& srcMessage: GetResponse(resp).GetMessages()) {
322322
Ydb::Ymq::V1::Message dstMessage;
323323

324-
for (TString& attributeName : AttributesNames) {
325-
if (attributeName == APPROXIMATE_RECEIVE_COUNT) {
326-
dstMessage.Mutableattributes()->at(attributeName)
327-
.assign(srcMessage.GetApproximateReceiveCount());
328-
} else if (attributeName == APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) {
329-
dstMessage.Mutableattributes()->at(attributeName)
330-
.assign(srcMessage.GetApproximateFirstReceiveTimestamp());
331-
} else if (attributeName == MESSAGE_DEDUPLICATION_ID) {
332-
dstMessage.Mutableattributes()->at(attributeName)
333-
.assign(srcMessage.GetMessageDeduplicationId());
334-
} else if (attributeName == MESSAGE_GROUP_ID) {
335-
dstMessage.Mutableattributes()->at(attributeName)
336-
.assign(srcMessage.GetMessageGroupId());
337-
} else if (attributeName == SENDER_ID) {
338-
dstMessage.Mutableattributes()->at(attributeName)
339-
.assign(srcMessage.GetSenderId());
340-
} else if (attributeName == SENT_TIMESTAMP) {
341-
dstMessage.Mutableattributes()->at(attributeName)
342-
.assign(srcMessage.GetSentTimestamp());
343-
} else if (attributeName == SEQUENCE_NUMBER) {
344-
dstMessage.Mutableattributes()->at(attributeName)
345-
.assign(srcMessage.GetSequenceNumber());
346-
}
324+
if (srcMessage.HasApproximateReceiveCount()) {
325+
dstMessage.Mutableattributes()->insert({APPROXIMATE_RECEIVE_COUNT, std::to_string(srcMessage.GetApproximateReceiveCount())});
326+
}
327+
if (srcMessage.HasApproximateFirstReceiveTimestamp()) {
328+
dstMessage.Mutableattributes()->insert({APPROXIMATE_FIRST_RECEIVE_TIMESTAMP, std::to_string(srcMessage.GetApproximateFirstReceiveTimestamp())});
329+
}
330+
if (srcMessage.HasMessageDeduplicationId()) {
331+
dstMessage.Mutableattributes()->insert({MESSAGE_DEDUPLICATION_ID, srcMessage.GetMessageDeduplicationId()});
332+
}
333+
if (srcMessage.HasMessageGroupId()) {
334+
dstMessage.Mutableattributes()->insert({MESSAGE_GROUP_ID, srcMessage.GetMessageGroupId()});
335+
}
336+
if (srcMessage.HasSenderId()) {
337+
dstMessage.Mutableattributes()->insert({SENDER_ID, srcMessage.GetSenderId()});
338+
}
339+
if (srcMessage.HasSentTimestamp()) {
340+
dstMessage.Mutableattributes()->insert({SENT_TIMESTAMP, std::to_string(srcMessage.GetSentTimestamp())});
341+
}
342+
if (srcMessage.HasSequenceNumber()) {
343+
dstMessage.Mutableattributes()->insert({SEQUENCE_NUMBER, std::to_string(srcMessage.GetSequenceNumber())});
347344
}
348345

349346
dstMessage.set_body(srcMessage.GetData());
@@ -391,17 +388,17 @@ namespace NKikimr::NYmq::V1 {
391388
// because AttributeNames is deprecated in favour of SystemAttributeNames
392389
if (systemAttributeNames.size() > 0) {
393390
for (int i = 0; i < systemAttributeNames.size(); i++) {
394-
result->SetAttributeName(i, systemAttributeNames[i]);
391+
result->AddAttributeName(systemAttributeNames[i]);
395392
}
396393
} else {
397394
auto attributeNames = GetProtoRequest()->Getattribute_names();
398395
for (int i = 0; i < attributeNames.size(); i++) {
399-
result->SetAttributeName(i, attributeNames[i]);
396+
result->AddAttributeName(attributeNames[i]);
400397
}
401398
}
402399

403400
for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) {
404-
result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]);
401+
result->AddMessageAttributeName(GetProtoRequest()->Getmessage_attribute_names()[i]);
405402
}
406403

407404
return result;

0 commit comments

Comments
 (0)