Skip to content

Implemented RemoteTopicReader KIKIMR-20306 #1093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ struct TKikimrEvents : TEvents {
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
};
};

Expand Down
47 changes: 47 additions & 0 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "table_writer.h"
#include "worker.h"

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>

namespace NKikimr::NReplication::NService {

class TLocalTableWriter: public TActor<TLocalTableWriter> {
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
Worker = ev->Sender;
Send(Worker, new TEvWorker::TEvHandshake());
}

void Handle(TEvWorker::TEvData::TPtr& ev) {
Worker = ev->Sender;
// TODO
}

public:
explicit TLocalTableWriter(const TString& path)
: TActor(&TThis::StateWork)
, Path(path)
{
Y_UNUSED(Path);
}

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
const TString Path;

TActorId Worker;

}; // TLocalTableWriter

IActor* CreateLocalTableWriter(const TString& path) {
return new TLocalTableWriter(path);
}

}
9 changes: 9 additions & 0 deletions ydb/core/tx/replication/service/table_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/core/base/defs.h>

namespace NKikimr::NReplication::NService {

IActor* CreateLocalTableWriter(const TString& path);

}
110 changes: 110 additions & 0 deletions ydb/core/tx/replication/service/topic_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "topic_reader.h"
#include "worker.h"

#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>

namespace NKikimr::NReplication::NService {

class TRemoteTopicReader: public TActor<TRemoteTopicReader> {
using TReadSessionSettings = NYdb::NTopic::TReadSessionSettings;

void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
Worker = ev->Sender;

Y_ABORT_UNLESS(!ReadSession);
Send(YdbProxy, new TEvYdbProxy::TEvCreateTopicReaderRequest(Settings));
}

void Handle(TEvYdbProxy::TEvCreateTopicReaderResponse::TPtr& ev) {
ReadSession = ev->Get()->Result;

Y_ABORT_UNLESS(!Worker);
Send(Worker, new TEvWorker::TEvHandshake());
}

void Handle(TEvWorker::TEvPoll::TPtr&) {
Y_ABORT_UNLESS(ReadSession);
Send(ReadSession, new TEvYdbProxy::TEvReadTopicRequest());

if (CommitOffset) {
Send(YdbProxy, new TEvYdbProxy::TEvCommitOffsetRequest(
Settings.Topics_[0].Path_,
Settings.Topics_[0].PartitionIds_[0],
Settings.ConsumerName_,
CommitOffset, {}
));
}
}

void Handle(TEvYdbProxy::TEvReadTopicResponse::TPtr& ev) {
auto& result = ev->Get()->Result;
TVector<TEvWorker::TEvData::TRecord> records(Reserve(result.Messages.size()));

for (auto& msg : result.Messages) {
Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW);
Y_DEBUG_ABORT_UNLESS(msg.GetOffset() + 1 > CommitOffset);
CommitOffset = Max(CommitOffset, msg.GetOffset() + 1);
records.emplace_back(msg.GetOffset(), std::move(msg.GetData()));
}

Send(Worker, new TEvWorker::TEvData(std::move(records)));
}

void Handle(TEvYdbProxy::TEvCommitOffsetResponse::TPtr& ev) {
if (!ev->Get()->Result.IsSuccess()) {
Leave();
}
}

void Leave() {
Send(Worker, new TEvents::TEvGone());
PassAway();
}

void PassAway() override {
if (const auto& actorId = std::exchange(ReadSession, {})) {
Send(actorId, new TEvents::TEvPoison());
}

TActor::PassAway();
}

public:
explicit TRemoteTopicReader(const TActorId& ydbProxy, const TReadSessionSettings& opts)
: TActor(&TThis::StateWork)
, YdbProxy(ydbProxy)
, Settings(opts)
{
Y_ABORT_UNLESS(Settings.Topics_.size() == 1);
Y_ABORT_UNLESS(Settings.Topics_.at(0).PartitionIds_.size() == 1);
}

STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvPoll, Handle);
hFunc(TEvYdbProxy::TEvCreateTopicReaderResponse, Handle);
hFunc(TEvYdbProxy::TEvReadTopicResponse, Handle);
hFunc(TEvYdbProxy::TEvCommitOffsetResponse, Handle);
sFunc(TEvents::TEvGone, Leave);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
const TActorId YdbProxy;
const TReadSessionSettings Settings;

TActorId Worker;
TActorId ReadSession;
ui64 CommitOffset = 0;

}; // TRemoteTopicReader

IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts) {
return new TRemoteTopicReader(ydbProxy, opts);
}

}
13 changes: 13 additions & 0 deletions ydb/core/tx/replication/service/topic_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include <ydb/core/base/defs.h>

namespace NYdb::NTopic {
struct TReadSessionSettings;
}

namespace NKikimr::NReplication::NService {

IActor* CreateRemoteTopicReader(const TActorId& ydbProxy, const NYdb::NTopic::TReadSessionSettings& opts);

}
148 changes: 148 additions & 0 deletions ydb/core/tx/replication/service/worker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#include "worker.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/services/services.pb.h>

namespace NKikimr::NReplication::NService {

TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data)
: Offset(offset)
, Data(data)
{
}

TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data)
: Offset(offset)
, Data(std::move(data))
{
}

TEvWorker::TEvData::TEvData(TVector<TRecord>&& records)
: Records(std::move(records))
{
}

class TWorker: public TActorBootstrapped<TWorker> {
struct TActorInfo {
THolder<IActor> Actor;
TActorId ActorId;
bool InitDone;

explicit TActorInfo(THolder<IActor>&& actor)
: Actor(std::move(actor))
, InitDone(false)
{
}

operator TActorId() const {
return ActorId;
}

explicit operator bool() const {
return InitDone;
}
};

TActorId RegisterActor(TActorInfo& info) {
Y_ABORT_UNLESS(info.Actor);
info.ActorId = RegisterWithSameMailbox(info.Actor.Release());
return info.ActorId;
}

void InitActor(TActorInfo& info) {
Y_ABORT_UNLESS(info.ActorId);
Send(info.ActorId, new TEvWorker::TEvHandshake());
info.InitDone = false;
}

void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
if (ev->Sender == Reader) {
Reader.InitDone = true;
} else if (ev->Sender == Writer) {
Writer.InitDone = true;
} else {
// TODO: log warn
}

if (Reader && Writer) {
Send(Reader, new TEvWorker::TEvPoll());
}
}

void Handle(TEvWorker::TEvPoll::TPtr& ev) {
if (ev->Sender != Writer) {
// TODO: log warn
return;
}

Send(ev->Forward(Reader));
}

void Handle(TEvWorker::TEvData::TPtr& ev) {
if (ev->Sender != Reader) {
// TODO: log warn
return;
}

Send(ev->Forward(Writer));
}

void Handle(TEvents::TEvGone::TPtr& ev) {
if (ev->Sender == Reader) {
// TODO
} else if (ev->Sender == Writer) {
// TODO
} else {
// TODO: log warn
}
}

void PassAway() override {
for (auto* actor : {&Reader, &Writer}) {
Send(*actor, new TEvents::TEvPoison());
}

TActorBootstrapped::PassAway();
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_WORKER;
}

explicit TWorker(THolder<IActor>&& reader, THolder<IActor>&& writer)
: Reader(std::move(reader))
, Writer(std::move(writer))
{
}

void Bootstrap() {
for (auto* actor : {&Reader, &Writer}) {
RegisterActor(*actor);
InitActor(*actor);
}

Become(&TThis::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvPoll, Handle);
hFunc(TEvWorker::TEvData, Handle);
hFunc(TEvents::TEvGone, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
TActorInfo Reader;
TActorInfo Writer;
};

IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer) {
return new TWorker(std::move(reader), std::move(writer));
}

}
43 changes: 43 additions & 0 deletions ydb/core/tx/replication/service/worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>

#include <util/generic/vector.h>

namespace NKikimr::NReplication::NService {

struct TEvWorker {
enum EEv {
EvBegin = EventSpaceBegin(TKikimrEvents::ES_REPLICATION_SERVICE),

EvHandshake,
EvPoll,
EvData,

EvEnd,
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_REPLICATION_SERVICE));

struct TEvHandshake: public TEventLocal<TEvHandshake, EvHandshake> {};
struct TEvPoll: public TEventLocal<TEvPoll, EvPoll> {};

struct TEvData: public TEventLocal<TEvData, EvData> {
struct TRecord {
ui64 Offset;
TString Data;

explicit TRecord(ui64 offset, const TString& data);
explicit TRecord(ui64 offset, TString&& data);
};

TVector<TRecord> Records;

explicit TEvData(TVector<TRecord>&& records);
};
};

IActor* CreateWorker(THolder<IActor>&& reader, THolder<IActor>&& writer);

}
5 changes: 5 additions & 0 deletions ydb/core/tx/replication/service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ LIBRARY()

PEERDIR(
ydb/core/base
ydb/core/tx/replication/ydb_proxy
ydb/library/actors/core
)

SRCS(
service.cpp
table_writer.cpp
topic_reader.cpp
worker.cpp
)

YQL_LAST_ABI_VERSION()
Expand Down
Loading