Skip to content

Commit fbdd3f2

Browse files
issues (except logging)
1 parent 9eeacb2 commit fbdd3f2

File tree

4 files changed

+39
-14
lines changed

4 files changed

+39
-14
lines changed

ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h

+12-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
66

7+
#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h>
8+
79
#include <unordered_set>
810

911
namespace NYdb::NFederatedTopic {
@@ -397,9 +399,18 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
397399
//! Discards previously set ReadOriginal and ReadMirrored settings.
398400
TSelf& ReadMirrored(TString database);
399401

402+
bool IsReadMirroredEnabled() {
403+
return ReadMirroredEnabled;
404+
}
405+
406+
auto GetDatabasesToReadFrom() {
407+
return DatabasesToReadFrom;
408+
}
409+
410+
private:
400411
// Read policy settings, set via helpers above
401412
bool ReadMirroredEnabled = false;
402-
std::unordered_set<TString> Databases;
413+
std::unordered_set<TString> DatabasesToReadFrom;
403414
};
404415

405416

ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp

+18-7
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSession
6464
, Observer(std::move(observer))
6565
, AsyncInit(Observer->WaitForFirstState())
6666
, FederationState(nullptr)
67+
, Log(Connections->GetLog())
6768
, SessionId(CreateGuidAsString())
6869
{
6970
}
@@ -101,15 +102,17 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() {
101102
CloseImpl();
102103
return;
103104
}
104-
if (Settings.ReadMirroredEnabled) {
105-
Y_ABORT_UNLESS(Settings.Databases.size() == 1);
106-
// add -mirrored-from- topics to Settings
105+
if (Settings.IsReadMirroredEnabled()) {
106+
Y_ABORT_UNLESS(Settings.GetDatabasesToReadFrom().size() == 1);
107+
auto dbToReadFrom = *Settings.GetDatabasesToReadFrom().begin();
107108

108-
// how to get mirrors in general case???
109-
std::vector<TString> dcNames = {"sas", "vla", "klg", "vlx"};
109+
std::vector<TString> dcNames = GetAllFederationLocations();
110110
auto topics = Settings.Topics_;
111111
for (const auto& topic : topics) {
112112
for (const auto& dc : dcNames) {
113+
if (AsciiEqualsIgnoreCase(dc, dbToReadFrom)) {
114+
continue;
115+
}
113116
auto mirroredTopic = topic;
114117
mirroredTopic.PartitionIds_.clear();
115118
mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dc);
@@ -134,17 +137,25 @@ void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() {
134137
OpenSubSessionsImpl(databases);
135138
}
136139

140+
std::vector<TString> TFederatedReadSessionImpl::GetAllFederationLocations() {
141+
std::vector<TString> result;
142+
for (const auto& db : FederationState->DbInfos) {
143+
result.push_back(db->location());
144+
}
145+
return result;
146+
}
147+
137148
bool TFederatedReadSessionImpl::IsDatabaseEligibleForRead(const std::shared_ptr<TDbInfo>& db) {
138149
if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE &&
139150
db->status() != TDbInfo::Status::DatabaseInfo_Status_READ_ONLY) {
140151
return false;
141152
}
142153

143-
if (Settings.Databases.empty()) {
154+
if (Settings.GetDatabasesToReadFrom().empty()) {
144155
return true;
145156
}
146157

147-
for (const auto& dbFromSettings : Settings.Databases) {
158+
for (const auto& dbFromSettings : Settings.GetDatabasesToReadFrom()) {
148159
if (AsciiEqualsIgnoreCase(db->name(), dbFromSettings) ||
149160
AsciiEqualsIgnoreCase(db->id(), dbFromSettings)) {
150161
return true;

ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext<TFederat
4646
}
4747

4848
private:
49-
// TODO logging
5049
TStringBuilder GetLogPrefix() const;
5150

5251
void Start();
5352
bool ValidateSettings();
5453
void OpenSubSessionsImpl(const std::vector<std::shared_ptr<TDbInfo>>& dbInfos);
5554

55+
std::vector<TString> GetAllFederationLocations();
56+
5657
bool IsDatabaseEligibleForRead(const std::shared_ptr<TDbInfo>& db);
5758

5859
void OnFederatedStateUpdateImpl();
@@ -70,8 +71,7 @@ class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext<TFederat
7071
NThreading::TFuture<void> AsyncInit;
7172
std::shared_ptr<TFederatedDbState> FederationState;
7273

73-
// TODO
74-
// TLog Log;
74+
TLog Log;
7575

7676
const TString SessionId;
7777
const TInstant StartSessionTime = TInstant::Now();

ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@ TReadOriginalSettings& TReadOriginalSettings::AddLocal() {
2323
}
2424

2525
TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadOriginal(TReadOriginalSettings settings) {
26-
std::swap(Databases, settings.Databases);
26+
std::swap(DatabasesToReadFrom, settings.Databases);
2727
ReadMirroredEnabled = false;
2828
return *this;
2929
}
3030

3131
TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(TString database) {
32-
Databases.clear();
33-
Databases.insert(std::move(database));
32+
if (database == "_local") {
33+
ythrow TContractViolation("Reading from local database not supported, use specific database");
34+
}
35+
DatabasesToReadFrom.clear();
36+
DatabasesToReadFrom.insert(std::move(database));
3437
ReadMirroredEnabled = true;
3538
return *this;
3639
}

0 commit comments

Comments
 (0)