Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 88 additions & 40 deletions ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>

#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h>

#include <unordered_set>

namespace NYdb::NFederatedTopic {

using NTopic::TPrintable;
Expand All @@ -16,10 +20,17 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFeder
using TPtr = TIntrusivePtr<TFederatedPartitionSession>;

public:
TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr<TDbInfo> db)
TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession,
std::shared_ptr<TDbInfo> db,
std::shared_ptr<TDbInfo> originDb = nullptr,
TString originPath = "")
: PartitionSession(partitionSession)
, Db(std::move(db))
{}
, ReadSourceDatabase(std::move(db))
, TopicOriginDatabase(originDb ? std::move(originDb) : ReadSourceDatabase)
, TopicOriginPath(originPath ? std::move(originPath) : PartitionSession->GetTopicPath())
{
Y_ABORT_UNLESS(ReadSourceDatabase);
}

//! Request partition session status.
//! Result will come to TPartitionSessionStatusEvent.
Expand All @@ -39,7 +50,7 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFeder

//! Topic path.
const TString& GetTopicPath() const {
return PartitionSession->GetTopicPath();
return TopicOriginPath;
}

//! Partition id.
Expand All @@ -48,34 +59,56 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFeder
}

const TString& GetDatabaseName() const {
return Db->name();
return GetTopicOriginDatabaseName();
}

const TString& GetDatabasePath() const {
return Db->path();
return GetTopicOriginDatabasePath();
}

const TString& GetDatabaseId() const {
return Db->id();
return GetTopicOriginDatabaseId();
}

const TString& GetReadSourceDatabaseName() const {
return ReadSourceDatabase->name();
}

const TString& GetReadSourceDatabasePath() const {
return ReadSourceDatabase->path();
}

const TString& GetReadSourceDatabaseId() const {
return ReadSourceDatabase->id();
}

const TString& GetTopicOriginDatabaseName() const {
return TopicOriginDatabase->name();
}

const TString& GetTopicOriginDatabasePath() const {
return TopicOriginDatabase->path();
}

const TString& GetTopicOriginDatabaseId() const {
return TopicOriginDatabase->id();
}

private:
NTopic::TPartitionSession::TPtr PartitionSession;
std::shared_ptr<TDbInfo> Db;
std::shared_ptr<TDbInfo> ReadSourceDatabase;
std::shared_ptr<TDbInfo> TopicOriginDatabase;
TString TopicOriginPath;
};

//! Events for read session.
struct TReadSessionEvent {
class TFederatedPartitionSessionAccessor {
public:
TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession)
explicit TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession)
: FederatedPartitionSession(std::move(partitionSession))
{}

TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db)
: FederatedPartitionSession(MakeIntrusive<TFederatedPartitionSession>(partitionSession, std::move(db)))
{}

inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const {
return FederatedPartitionSession;
}
Expand All @@ -88,8 +121,8 @@ struct TReadSessionEvent {
struct TFederated : public TFederatedPartitionSessionAccessor, public TEvent, public TPrintable<TFederated<TEvent>> {
using TPrintable<TFederated<TEvent>>::DebugString;

TFederated(TEvent event, std::shared_ptr<TDbInfo> db)
: TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db)
TFederated(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession)
: TFederatedPartitionSessionAccessor(std::move(federatedPartitionSession))
, TEvent(std::move(event))
{}

Expand All @@ -109,10 +142,7 @@ struct TReadSessionEvent {
using TCompressedMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>;

public:
TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db);

TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db);
TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession);

const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override {
ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead";
Expand Down Expand Up @@ -177,15 +207,6 @@ struct TReadSessionEvent {
TSessionClosedEvent>;
};

template <typename TEvent>
TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, std::shared_ptr<TDbInfo> db) {
return {std::move(event), std::move(db)};
}

TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db);

TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db);

//! Set of offsets to commit.
//! Class that could store offsets in order to commit them later.
//! This class is not thread safe.
Expand Down Expand Up @@ -273,7 +294,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
bool GracefulStopAfterCommit;
};


//! Set simple handler with data processing and also
//! set other handlers with default behaviour.
//! They automatically commit data after processing
Expand All @@ -290,7 +310,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
//! commitDataAfterProcessing: automatically commit data after calling of dataHandler.
//! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming
//! partition session destroy.

TSimpleDataHandlers SimpleDataHandlers_;

TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler,
Expand Down Expand Up @@ -368,18 +387,47 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
//! See description in TFederatedEventHandlers class.
FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers);

enum class EReadPolicy {
READ_ALL = 0,
READ_ORIGINAL,
READ_MIRRORED

//! Read policy settings

//! Databases to read from.
//! Default (empty) value means reading from all available databases.
//! Adding duplicates or unavailable databases is okay, they will be ignored.
struct TReadOriginalSettings {
//! Add reading from specified database if it's available.
TReadOriginalSettings& AddDatabase(TString database);

//! Add reading from several specified databases, if available.
TReadOriginalSettings& AddDatabases(std::vector<TString> databases);

//! Add reading from database(s) with the same location as client.
TReadOriginalSettings& AddLocal();

std::unordered_set<TString> Databases;
};

//! Policy for federated reading.
//!
//! READ_ALL: read will be done from all topic instances from all databases.
//! READ_ORIGINAL:
//! READ_MIRRORED:
FLUENT_SETTING_DEFAULT(EReadPolicy, ReadPolicy, EReadPolicy::READ_ALL);
//! Default variant.
//! Read original topics specified in NTopic::TReadSessionSettings::Topics from databases, specified in settings.
//! Discards previously set ReadOriginal and ReadMirrored settings.
TSelf& ReadOriginal(TReadOriginalSettings settings);

//! Read original and mirrored topics specified in NTopic::TReadSessionSettings::Topics
//! from one specified database.
//! Discards previously set ReadOriginal and ReadMirrored settings.
TSelf& ReadMirrored(TString database);

bool IsReadMirroredEnabled() {
return ReadMirroredEnabled;
}

auto GetDatabasesToReadFrom() {
return DatabasesToReadFrom;
}

private:
// Read policy settings, set via helpers above
bool ReadMirroredEnabled = false;
std::unordered_set<TString> DatabasesToReadFrom;
};


Expand Down
Loading