Skip to content

Commit 14d776a

Browse files
committed
SQS: Handle empty QueueUrl correctly (stable-24-3) (ydb-platform#11635)
1 parent 56a2e8d commit 14d776a

File tree

7 files changed

+68
-43
lines changed

7 files changed

+68
-43
lines changed

ydb/core/http_proxy/http_req.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -468,11 +468,11 @@ namespace NKikimr::NHttpProxy {
468468
auto queueUrl = QueueUrlExtractor(Request);
469469
if (!queueUrl.empty()) {
470470
auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl);
471-
if(cloudIdAndResourceId.Empty()) {
471+
if (cloudIdAndResourceId.first.empty()) {
472472
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
473473
}
474-
CloudId = cloudIdAndResourceId.Get()->first;
475-
ResourceId = cloudIdAndResourceId.Get()->second;
474+
CloudId = cloudIdAndResourceId.first;
475+
ResourceId = cloudIdAndResourceId.second;
476476
}
477477
} catch (const NKikimr::NSQS::TSQSException& e) {
478478
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;

ydb/core/http_proxy/ut/datastreams_fixture.h

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) {
6666
}
6767
}
6868

69-
7069
class THttpProxyTestMock : public NUnitTest::TBaseFixture {
70+
friend class THttpProxyTestMockForSQS;
7171
public:
7272
THttpProxyTestMock() = default;
7373
~THttpProxyTestMock() = default;
@@ -80,12 +80,12 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
8080
InitAll();
8181
}
8282

83-
void InitAll() {
83+
void InitAll(bool yandexCloudMode = true) {
8484
AccessServicePort = PortManager.GetPort(8443);
8585
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
86-
InitKikimr();
86+
InitKikimr(yandexCloudMode);
8787
InitAccessServiceService();
88-
InitHttpServer();
88+
InitHttpServer(yandexCloudMode);
8989
}
9090

9191
static TString FormAuthorizationStr(const TString& region) {
@@ -364,7 +364,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
364364
return resultSet;
365365
}
366366

367-
void InitKikimr() {
367+
void InitKikimr(bool yandexCloudMode) {
368368
AuthFactory = std::make_shared<TIamAuthFactory>();
369369
NKikimrConfig::TAppConfig appConfig;
370370
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
@@ -375,7 +375,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
375375
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);
376376

377377
appConfig.MutableSqsConfig()->SetEnableSqs(true);
378-
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
378+
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
379379
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);
380380

381381
auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
@@ -638,7 +638,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
638638
AccessServiceServer = builder.BuildAndStart();
639639
}
640640

641-
void InitHttpServer() {
641+
void InitHttpServer(bool yandexCloudMode) {
642642
NKikimrConfig::TServerlessProxyConfig config;
643643
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central1");
644644
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central-1");
@@ -648,7 +648,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
648648
config.MutableHttpConfig()->SetAccessServiceEndpoint(TStringBuilder() << "127.0.0.1:" << AccessServicePort);
649649
config.SetTestMode(true);
650650
config.MutableHttpConfig()->SetPort(HttpServicePort);
651-
config.MutableHttpConfig()->SetYandexCloudMode(true);
651+
config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode);
652652
config.MutableHttpConfig()->SetYmqEnabled(true);
653653

654654
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYdb::CreateOAuthCredentialsProviderFactory("proxy_sa@builtin");
@@ -753,3 +753,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
753753
ui16 MonPort = 0;
754754
ui16 KikimrGrpcPort = 0;
755755
};
756+
757+
class THttpProxyTestMockForSQS : public THttpProxyTestMock {
758+
void SetUp(NUnitTest::TTestContext&) override {
759+
InitAll(false);
760+
}
761+
};

ydb/core/http_proxy/ut/http_proxy_ut.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1663,6 +1663,18 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
16631663
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
16641664
}
16651665

1666+
Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
1667+
NJson::TJsonValue sendMessageReq;
1668+
sendMessageReq["QueueUrl"] = "";
1669+
auto body = "MessageBody-0";
1670+
sendMessageReq["MessageBody"] = body;
1671+
sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0";
1672+
sendMessageReq["MessageGroupId"] = "MessageGroupId-0";
1673+
1674+
auto res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
1675+
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
1676+
}
1677+
16661678
Y_UNIT_TEST_F(TestReceiveMessage, THttpProxyTestMock) {
16671679
auto createQueueReq = CreateSqsCreateQueueRequest();
16681680
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));

ydb/services/ymq/utils.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#include "utils.h"
2+
3+
#include <util/string/split.h>
4+
#include <ydb/core/ymq/base/utils.h>
5+
6+
7+
namespace NKikimr::NYmq {
8+
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
9+
auto protocolSeparator = queueUrl.find("://");
10+
if (protocolSeparator == TString::npos) {
11+
return {"", ""};
12+
}
13+
14+
auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
15+
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
16+
if (parts.size() < 3) {
17+
return {"", ""};
18+
}
19+
20+
bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
21+
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
22+
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
23+
return {accountName, queueName};
24+
}
25+
}

ydb/services/ymq/utils.h

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,7 @@
11
#pragma once
22

33
#include <util/generic/string.h>
4-
#include <util/generic/maybe.h>
5-
#include <util/string/split.h>
6-
#include <ydb/core/ymq/base/utils.h>
74

85
namespace NKikimr::NYmq {
9-
inline static TMaybe<std::pair<TString, TString>> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
10-
auto protocolSeparator = queueUrl.find("://");
11-
if (protocolSeparator == TString::npos) {
12-
return Nothing();
13-
}
14-
15-
auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
16-
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
17-
if (parts.size() < 3) {
18-
return Nothing();
19-
}
20-
21-
bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
22-
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
23-
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
24-
return std::pair<TString, TString>(std::move(accountName), std::move(queueName));
25-
}
6+
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl);
267
}

ydb/services/ymq/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
ymq_proxy.cpp
55
grpc_service.cpp
6+
utils.cpp
67
)
78

89
PEERDIR(

ydb/services/ymq/ymq_proxy.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ namespace NKikimr::NYmq::V1 {
290290
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
291291
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);
292292

293-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
293+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
294294

295295
result->SetMessageBody(GetProtoRequest()->Getmessage_body());
296296

@@ -376,7 +376,7 @@ namespace NKikimr::NYmq::V1 {
376376
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
377377
auto result = requestHolder.MutableReceiveMessage();
378378

379-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
379+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
380380

381381
COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
382382
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
@@ -496,7 +496,7 @@ namespace NKikimr::NYmq::V1 {
496496
private:
497497
NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
498498
auto result = requestHolder.MutableGetQueueAttributes();
499-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
499+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
500500
for (const auto& attributeName : GetProtoRequest()->Getattribute_names()) {
501501
result->MutableNames()->Add()->assign(attributeName);
502502
}
@@ -574,7 +574,7 @@ namespace NKikimr::NYmq::V1 {
574574
private:
575575
NKikimr::NSQS::TDeleteMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
576576
auto result = requestHolder.MutableDeleteMessage();
577-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
577+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
578578
result->SetReceiptHandle(GetProtoRequest()->receipt_handle());
579579
return result;
580580
}
@@ -607,7 +607,7 @@ namespace NKikimr::NYmq::V1 {
607607
private:
608608
NKikimr::NSQS::TPurgeQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
609609
auto result = requestHolder.MutablePurgeQueue();
610-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
610+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
611611
return result;
612612
}
613613
};
@@ -639,7 +639,7 @@ namespace NKikimr::NYmq::V1 {
639639
private:
640640
NKikimr::NSQS::TDeleteQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
641641
auto result = requestHolder.MutableDeleteQueue();
642-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
642+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
643643
return result;
644644
}
645645
};
@@ -671,7 +671,7 @@ namespace NKikimr::NYmq::V1 {
671671
private:
672672
NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(TSqsRequest& requestHolder) override {
673673
auto result = requestHolder.MutableChangeMessageVisibility();
674-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
674+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
675675
result->SetReceiptHandle(GetProtoRequest()->Getreceipt_handle());
676676
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
677677
return result;
@@ -713,7 +713,7 @@ namespace NKikimr::NYmq::V1 {
713713
private:
714714
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
715715
auto result = requestHolder.MutableSetQueueAttributes();
716-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
716+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
717717
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
718718
AddAttribute(requestHolder, name, value);
719719
}
@@ -751,7 +751,7 @@ namespace NKikimr::NYmq::V1 {
751751
private:
752752
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(TSqsRequest& requestHolder) override {
753753
auto result = requestHolder.MutableListDeadLetterSourceQueues();
754-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
754+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
755755
return result;
756756
}
757757
};
@@ -803,7 +803,7 @@ namespace NKikimr::NYmq::V1 {
803803
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
804804
auto result = requestHolder.MutableSendMessageBatch();
805805

806-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
806+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
807807

808808
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
809809
auto entry = requestHolder.MutableSendMessageBatch()->MutableEntries()->Add();
@@ -870,7 +870,7 @@ namespace NKikimr::NYmq::V1 {
870870
private:
871871
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
872872
auto result = requestHolder.MutableDeleteMessageBatch();
873-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
873+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
874874
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
875875
auto entry = requestHolder.MutableDeleteMessageBatch()->AddEntries();
876876
entry->SetId(requestEntry.Getid());
@@ -921,7 +921,7 @@ namespace NKikimr::NYmq::V1 {
921921
private:
922922
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
923923
auto result = requestHolder.MutableChangeMessageVisibilityBatch();
924-
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
924+
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
925925
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
926926
auto entry = requestHolder.MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
927927
entry->SetId(requestEntry.Getid());

0 commit comments

Comments
 (0)