Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ydb/core/tx/datashard/change_exchange_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/lib/base/msgbus_status.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -155,7 +157,10 @@ class TCdcPartitionWorker: public TActorBootstrapped<TCdcPartitionWorker> {

}; // TCdcPartitionWorker

class TCdcWorker: public TActorBootstrapped<TCdcWorker>, private TSchemeCacheHelpers {
class TCdcWorker
: public TActorBootstrapped<TCdcWorker>
, private NSchemeCache::TSchemeCacheHelpers
{
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
LogPrefix = TStringBuilder()
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/change_sender_async_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/tablet_flat/flat_row_eggs.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>

Expand Down Expand Up @@ -325,7 +327,7 @@ class TAsyncIndexChangeSenderMain
: public TActorBootstrapped<TAsyncIndexChangeSenderMain>
, public TBaseChangeSender
, public IChangeSenderResolver
, private TSchemeCacheHelpers
, private NSchemeCache::TSchemeCacheHelpers
{
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/change_sender_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/tx/scheme_cache/helpers.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/services/lib/sharding/sharding.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
Expand Down Expand Up @@ -288,7 +290,7 @@ class TCdcChangeSenderMain
: public TActorBootstrapped<TCdcChangeSenderMain>
, public TBaseChangeSender
, public IChangeSenderResolver
, private TSchemeCacheHelpers
, private NSchemeCache::TSchemeCacheHelpers
{
struct TPQPartitionInfo {
ui32 PartitionId;
Expand Down
95 changes: 0 additions & 95 deletions ydb/core/tx/datashard/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "change_exchange_helpers.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -170,99 +169,5 @@ class TBaseChangeSender: public IChangeSender {

}; // TBaseChangeSender

struct TSchemeCacheHelpers {
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet;
using TResolve = NSchemeCache::TSchemeCacheRequest;
using TEvResolve = TEvTxProxySchemeCache::TEvResolveKeySet;
using TCheckFailFunc = std::function<void(const TString&)>;

inline static TNavigate::TEntry MakeNavigateEntry(const TTableId& tableId, TNavigate::EOp op) {
TNavigate::TEntry entry;
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.TableId = tableId;
entry.Operation = op;
entry.ShowPrivatePath = true;
return entry;
}

template <typename T>
static bool CheckNotEmpty(const TStringBuf marker, const TAutoPtr<T>& result, TCheckFailFunc onFailure) {
if (result) {
return true;
}

onFailure(TStringBuilder() << "Empty result at '" << marker << "'");
return false;
}

template <typename T>
static bool CheckEntriesCount(const TStringBuf marker, const TAutoPtr<T>& result, ui32 expected, TCheckFailFunc onFailure) {
if (result->ResultSet.size() == expected) {
return true;
}

onFailure(TStringBuilder() << "Unexpected entries count at '" << marker << "'"
<< ": expected# " << expected
<< ", got# " << result->ResultSet.size()
<< ", result# " << result->ToString(*AppData()->TypeRegistry));
return false;
}

inline static const TTableId& GetTableId(const TNavigate::TEntry& entry) {
return entry.TableId;
}

inline static const TTableId& GetTableId(const TResolve::TEntry& entry) {
return entry.KeyDescription->TableId;
}

template <typename T>
static bool CheckTableId(const TStringBuf marker, const T& entry, const TTableId& expected, TCheckFailFunc onFailure) {
if (GetTableId(entry).HasSamePath(expected)) {
return true;
}

onFailure(TStringBuilder() << "Unexpected table id at '" << marker << "'"
<< ": expected# " << expected
<< ", got# " << GetTableId(entry)
<< ", entry# " << entry.ToString());
return false;
}

inline static bool IsSucceeded(TNavigate::EStatus status) {
return status == TNavigate::EStatus::Ok;
}

inline static bool IsSucceeded(TResolve::EStatus status) {
return status == TResolve::EStatus::OkData;
}

template <typename T>
static bool CheckEntrySucceeded(const TStringBuf marker, const T& entry, TCheckFailFunc onFailure) {
if (IsSucceeded(entry.Status)) {
return true;
}

onFailure(TStringBuilder() << "Failed entry at '" << marker << "'"
<< ": entry# " << entry.ToString());
return false;
}

template <typename T>
static bool CheckEntryKind(const TStringBuf marker, const T& entry, TNavigate::EKind expected, TCheckFailFunc onFailure) {
if (entry.Kind == expected) {
return true;
}

onFailure(TStringBuilder() << "Unexpected entry kind at '" << marker << "'"
<< ", expected# " << static_cast<ui32>(expected)
<< ", got# " << static_cast<ui32>(entry.Kind)
<< ", entry# " << entry.ToString());
return false;
}

}; // TSchemeCacheHelpers

} // NDataShard
} // NKikimr
105 changes: 105 additions & 0 deletions ydb/core/tx/scheme_cache/helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#pragma once

#include <ydb/core/tx/scheme_cache/scheme_cache.h>

#include <util/string/builder.h>

#include <functional>

namespace NKikimr::NSchemeCache {

struct TSchemeCacheHelpers {
using TNavigate = TSchemeCacheNavigate;
using TEvNavigate = TEvTxProxySchemeCache::TEvNavigateKeySet;
using TResolve = TSchemeCacheRequest;
using TEvResolve = TEvTxProxySchemeCache::TEvResolveKeySet;
using TCheckFailFunc = std::function<void(const TString&)>;

inline static TNavigate::TEntry MakeNavigateEntry(const TTableId& tableId, TNavigate::EOp op) {
TNavigate::TEntry entry;
entry.RequestType = TNavigate::TEntry::ERequestType::ByTableId;
entry.TableId = tableId;
entry.Operation = op;
entry.ShowPrivatePath = true;
return entry;
}

template <typename T>
static bool CheckNotEmpty(const TStringBuf marker, const TAutoPtr<T>& result, TCheckFailFunc onFailure) {
if (result) {
return true;
}

onFailure(TStringBuilder() << "Empty result at '" << marker << "'");
return false;
}

template <typename T>
static bool CheckEntriesCount(const TStringBuf marker, const TAutoPtr<T>& result, ui32 expected, TCheckFailFunc onFailure) {
if (result->ResultSet.size() == expected) {
return true;
}

onFailure(TStringBuilder() << "Unexpected entries count at '" << marker << "'"
<< ": expected# " << expected
<< ", got# " << result->ResultSet.size()
<< ", result# " << result->ToString(*AppData()->TypeRegistry));
return false;
}

inline static const TTableId& GetTableId(const TNavigate::TEntry& entry) {
return entry.TableId;
}

inline static const TTableId& GetTableId(const TResolve::TEntry& entry) {
return entry.KeyDescription->TableId;
}

template <typename T>
static bool CheckTableId(const TStringBuf marker, const T& entry, const TTableId& expected, TCheckFailFunc onFailure) {
if (GetTableId(entry).HasSamePath(expected)) {
return true;
}

onFailure(TStringBuilder() << "Unexpected table id at '" << marker << "'"
<< ": expected# " << expected
<< ", got# " << GetTableId(entry)
<< ", entry# " << entry.ToString());
return false;
}

inline static bool IsSucceeded(TNavigate::EStatus status) {
return status == TNavigate::EStatus::Ok;
}

inline static bool IsSucceeded(TResolve::EStatus status) {
return status == TResolve::EStatus::OkData;
}

template <typename T>
static bool CheckEntrySucceeded(const TStringBuf marker, const T& entry, TCheckFailFunc onFailure) {
if (IsSucceeded(entry.Status)) {
return true;
}

onFailure(TStringBuilder() << "Failed entry at '" << marker << "'"
<< ": entry# " << entry.ToString());
return false;
}

template <typename T>
static bool CheckEntryKind(const TStringBuf marker, const T& entry, TNavigate::EKind expected, TCheckFailFunc onFailure) {
if (entry.Kind == expected) {
return true;
}

onFailure(TStringBuilder() << "Unexpected entry kind at '" << marker << "'"
<< ", expected# " << static_cast<ui32>(expected)
<< ", got# " << static_cast<ui32>(entry.Kind)
<< ", entry# " << entry.ToString());
return false;
}

}; // TSchemeCacheHelpers

}