Skip to content

If optional is empty, get proper JSON value #15168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/core/ymq/actor/index_events_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ void TSearchEventsProcessor::OnQueuesListQueryComplete(NKqp::TEvKqp::TEvQueryRes
auto customName = *parser.ColumnParser(2).GetOptionalUtf8();
auto createTs = *parser.ColumnParser(3).GetOptionalUint64();
auto folderId = *parser.ColumnParser(4).GetOptionalUtf8();
auto tags = *parser.ColumnParser(5).GetOptionalUtf8();
auto tags = parser.ColumnParser(5).GetOptionalUtf8().value_or("{}");
auto insResult = ExistingQueues.insert(std::make_pair(
queueName, TQueueEvent{EQueueEventType::Existed, createTs, customName, cloudId, folderId, tags}
));
Expand Down Expand Up @@ -197,7 +197,7 @@ void TSearchEventsProcessor::OnEventsListingDone(NKqp::TEvKqp::TEvQueryResponse:
auto customName = *parser.ColumnParser(3).GetOptionalUtf8();
auto timestamp = *parser.ColumnParser(4).GetOptionalUint64();
auto folderId = *parser.ColumnParser(5).GetOptionalUtf8();
auto labels = *parser.ColumnParser(6).GetOptionalUtf8();
auto labels = parser.ColumnParser(6).GetOptionalUtf8().value_or("{}");
auto& qEvents = QueuesEvents[queueName];
auto insResult = qEvents.insert(std::make_pair(
timestamp, TQueueEvent{EQueueEventType(evType), timestamp, customName, cloudId, folderId, labels}
Expand Down
49 changes: 32 additions & 17 deletions ydb/core/ymq/actor/yc_search_ut/index_events_processor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class TIndexProcesorTests : public TTestBase {
UNIT_ASSERT(statusVal.IsSuccess());
}
void AddEvent(
const TString& account, const TString& queueName, const EEvType& type, TInstant ts = TInstant::Zero(), TString labels = "{}")
const TString& account, const TString& queueName, const EEvType& type, TInstant ts = TInstant::Zero(), TMaybe<TString> labels = "{}")
{
if (ts == TInstant::Zero())
ts = CurrTs;
Expand All @@ -143,12 +143,12 @@ class TIndexProcesorTests : public TTestBase {
<< "\"myQueueCustomName\", "
<< ts.MilliSeconds() << ", "
<< "\"myFolder\", "
<< "\"" << labels << "\""
<< (labels.Defined() ? "\"" + labels.GetRef() + "\"" : "NULL")
<< ");";
ExecDataQuery(queryBuilder.c_str());
}

void AddQueue(const TString& account, const TString& queueName, TInstant ts = TInstant::Zero(), TString tags = "{}") {
void AddQueue(const TString& account, const TString& queueName, TInstant ts = TInstant::Zero(), TMaybe<TString> tags = "{}") {
if (ts == TInstant::Zero())
ts = CurrTs;
TStringBuilder queryBuilder;
Expand All @@ -159,12 +159,12 @@ class TIndexProcesorTests : public TTestBase {
<< "\"myQueueCustomName\", "
<< ts.MilliSeconds() << ", "
<< "\"myFolder\", "
<< "\"" << tags << "\""
<< (tags.Defined() ? "\"" + tags.GetRef() + "\"" : "NULL")
<< ");";
ExecDataQuery(queryBuilder.c_str());
}

void AddQueuesBatch(const TString& account, const TString& queueNameBase, ui64 count, ui64 startIndex = 0, TString tags = "{}") {
void AddQueuesBatch(const TString& account, const TString& queueNameBase, ui64 count, ui64 startIndex = 0, TMaybe<TString> tags = "{}") {
Cerr << "===Started add queue batch\n";
TDeque<NYdb::NTable::TAsyncDataQueryResult> results;
ui64 maxInflight = 1;
Expand All @@ -181,7 +181,7 @@ class TIndexProcesorTests : public TTestBase {
<< "\"myQueueCustomName\", "
<< CurrTs.MilliSeconds() << ", "
<< "\"myFolder\", "
<< "\"" << tags << "\""
<< (tags.Defined() ? "\"" + tags.GetRef() + "\"" : "NULL")
<< ");";

auto preparedResult = session.PrepareDataQuery(queryBuilder.c_str()).GetValueSync();
Expand Down Expand Up @@ -323,41 +323,56 @@ class TIndexProcesorTests : public TTestBase {
TTestRunner("CreateIndexProcessor", this);
}

void TestSingleCreateQueueEvent() {
void CheckSingleCreateQueueEvent(bool nullLabels) {
TTestRunner runner{"SingleCreateQueueEvent", this};
const TString labels = "{\"k1\": \"v1\"}";
const TString escapedLabels = EscapeC(labels);
runner.AddEvent( "cloud1", "queue1", EEvType::Created, {}, escapedLabels);
runner.AddEvent("cloud1", "queue1", EEvType::Created, {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
runner.DispatchEvents();
auto messages = runner.EventsWriter->GetMessages();
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); // Events, reindex
CheckEventsLine(messages[0], EEvType::Created, {}, labels);
CheckEventsLine(messages[1], EEvType::Existed, {}, labels);
CheckEventsLine(messages[0], EEvType::Created, {}, nullLabels ? "{}" : labels);
CheckEventsLine(messages[1], EEvType::Existed, {}, nullLabels ? "{}" : labels);
UNIT_ASSERT_VALUES_EQUAL(runner.CountEvents(), 0);
}

void TestReindexSingleQueue() {
void TestSingleCreateQueueEvent() {
CheckSingleCreateQueueEvent(false);
CheckSingleCreateQueueEvent(true);
}

void CheckReindexSingleQueue(bool nullLabels) {
TTestRunner runner{"ReindexSingleQueue", this};
const TString labels = "{\"k1\": \"v1\"}";
const TString escapedLabels = EscapeC(labels);
runner.AddQueue("cloud1", "queue1", {}, escapedLabels);
runner.AddQueue("cloud1", "queue1", {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
runner.DispatchEvents();
auto messages = runner.EventsWriter->GetMessages();
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
CheckEventsLine(messages[0], EEvType::Existed, {}, labels);
CheckEventsLine(messages[0], EEvType::Existed, {}, nullLabels ? "{}" : labels);
}

void TestDeletedQueueNotReindexed() {
void TestReindexSingleQueue() {
CheckReindexSingleQueue(false);
CheckReindexSingleQueue(true);
}

void CheckDeletedQueueNotReindexed(bool nullLabels) {
TTestRunner runner{"DeletedQueueNotReindexed", this};
const TString labels = "{\"k1\": \"v1\"}";
const TString escapedLabels = EscapeC(labels);
runner.AddQueue("cloud1", "queue2", runner.PrevTs, escapedLabels);
runner.AddEvent("cloud1", "queue2", EEvType::Deleted, {}, escapedLabels);
runner.AddQueue("cloud1", "queue2", runner.PrevTs, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
runner.AddEvent("cloud1", "queue2", EEvType::Deleted, {}, nullLabels ? Nothing() : TMaybe<TString>(escapedLabels));
Sleep(TDuration::Seconds(1));
runner.DispatchEvents();
auto messages = runner.EventsWriter->GetMessages();
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
CheckEventsLine(messages[0], EEvType::Deleted, {}, labels);
CheckEventsLine(messages[0], EEvType::Deleted, {}, nullLabels ? "{}" : labels);
}

void TestDeletedQueueNotReindexed() {
CheckDeletedQueueNotReindexed(false);
CheckDeletedQueueNotReindexed(true);
}

void TestManyMessages() {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/ymq/queues/common/queries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ extern const char* const MatchQueueAttributesQuery = R"__(
(And
(And
(And (Equal (Member queuesRead 'Shards) shards)
(Equal (Member queuesRead 'Tags) tags))
(Equal (Coalesce (Member queuesRead 'Tags) (Utf8String '"{}")) tags))
(Equal (Member queuesRead 'Partitions) partitions))
(Equal (Member queuesRead 'FifoQueue) fifo))
(Equal (Coalesce (Member queuesRead 'DlqName) (Utf8String '"")) dlqName))
(Bool 'true)))
(Bool 'false)))

(let attrRow '(
)__" ATTRS_KEYS_PARAM R"__(
Expand All @@ -122,7 +122,7 @@ extern const char* const MatchQueueAttributesQuery = R"__(
(Equal (Member attrRead 'MessageRetentionPeriod) retention)))
(Equal (Member attrRead 'VisibilityTimeout) visibility))
(Equal (Coalesce (Member attrRead 'MaxReceiveCount) (Uint64 '0)) maxReceiveCount))
(Bool 'true)))
(Bool 'false)))

(let sameVersion
(Equal currentVersion expectedVersion))
Expand Down
Loading