Skip to content

LocalTableWriter skeleton KIKIMR-20673 #1214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions ydb/core/change_exchange/change_sender_common_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ bool TBaseChangeSender::RequestRecords() {
return false;
}

ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRequestRecords(std::move(records)));
return true;
}

Expand Down Expand Up @@ -431,11 +431,9 @@ void TBaseChangeSender::RemoveRecords() {
}
}

TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
const TActorId& changeServer, const TPathId& pathId)
TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TPathId& pathId)
: ActorOps(actorOps)
, Resolver(resolver)
, ChangeServer(changeServer)
, PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
Expand Down
11 changes: 5 additions & 6 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "change_exchange.h"

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/mon.h>

#include <util/generic/hash.h>
Expand Down Expand Up @@ -52,6 +51,8 @@ class IChangeSender {
public:
virtual ~IChangeSender() = default;

virtual TActorId GetChangeServer() const = 0;

virtual void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) = 0;
virtual void KillSenders() = 0;
virtual IActor* CreateSender(ui64 partitionId) = 0;
Expand Down Expand Up @@ -120,12 +121,12 @@ class TBaseChangeSender: public IChangeSender {
remove.push_back(record.Order);
}

ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}

template <>
void RemoveRecords(TVector<ui64>&& records) {
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
ActorOps->Send(GetChangeServer(), new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
}

void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
Expand All @@ -138,8 +139,7 @@ class TBaseChangeSender: public IChangeSender {
void OnReady(ui64 partitionId) override;
void OnGone(ui64 partitionId) override;

explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
const TActorId& changeServer, const TPathId& pathId);
explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver, const TPathId& pathId);

void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);

Expand All @@ -148,7 +148,6 @@ class TBaseChangeSender: public IChangeSender {
IChangeSenderResolver* const Resolver;

protected:
const TActorId ChangeServer;
const TPathId PathId;

private:
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,10 @@ class TAsyncIndexChangeSenderMain
return StateBase(ev);
}

TActorId GetChangeServer() const override {
return DataShard.ActorId;
}

void Resolve() override {
ResolveIndex();
}
Expand Down Expand Up @@ -794,7 +798,7 @@ class TAsyncIndexChangeSenderMain

explicit TAsyncIndexChangeSenderMain(const TDataShardId& dataShard, const TTableId& userTableId, const TPathId& indexPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, dataShard.ActorId, indexPathId)
, TBaseChangeSender(this, this, indexPathId)
, DataShard(dataShard)
, UserTableId(userTableId)
, IndexTableVersion(0)
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,10 @@ class TCdcChangeSenderMain
return StateBase(ev);
}

TActorId GetChangeServer() const override {
return DataShard.ActorId;
}

void Resolve() override {
ResolveCdcStream();
}
Expand Down Expand Up @@ -760,7 +764,7 @@ class TCdcChangeSenderMain

explicit TCdcChangeSenderMain(const TDataShardId& dataShard, const TPathId& streamPathId)
: TActorBootstrapped()
, TBaseChangeSender(this, this, dataShard.ActorId, streamPathId)
, TBaseChangeSender(this, this, streamPathId)
, DataShard(dataShard)
, TopicVersion(0)
{
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include "change_sender_common_ops.h"

#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/json/json_writer.h>

#include <ydb/core/base/path.h>
#include <ydb/core/change_exchange/change_sender_common_ops.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/user_info.h>
#include <ydb/core/persqueue/write_meta.h>
Expand All @@ -14,6 +10,10 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>

#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/json/json_writer.h>

#include <util/generic/size_literals.h>
#include <util/string/join.h>
#include <util/string/printf.h>
Expand Down Expand Up @@ -2926,7 +2926,7 @@ Y_UNIT_TEST_SUITE(Cdc) {

bool ready = false;
auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) {
if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchangePrivate::EvReady) {
ready = true;
}

Expand All @@ -2948,7 +2948,7 @@ Y_UNIT_TEST_SUITE(Cdc) {

THolder<IEventHandle> delayed;
prevObserver = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvChangeExchangePrivate::EvReady) {
if (ev->GetTypeRewrite() == NChangeExchange::TEvChangeExchangePrivate::EvReady) {
delayed.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
Expand Down
Loading