Skip to content

Stable-24-4: initial commits #14402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
eab81dc
Bump MKQL_RUNTIME_VERSION and SSA_RUNTIME_VERSION
maximyurchuk Nov 12, 2024
98041ad
Fix compatibility info
maximyurchuk Nov 12, 2024
f6a4561
Add prestable into ci checks (#13927)
maximyurchuk Jan 28, 2025
c84474a
Temporary remove ReleaseApprovers for prestable
maximyurchuk Jan 28, 2025
46e00b8
Enable topics transactions and autopartitioning (#13922)
nshestakov Jan 30, 2025
f1c3a77
[24-4] EnableLocalDBBtreeIndex = true (#13923)
kunga Jan 30, 2025
b6ac41a
[24-4] TSharedCacheConfig.ReplacementPolicy = S3FIFO (#13924)
kunga Jan 30, 2025
3c6a812
prestable-24-4: Enable scale recommender feature flag (#13962)
pixcc Jan 30, 2025
8b87241
VIEW: enable the feature flag by default (#13990)
jepett0 Jan 30, 2025
7b221e9
enable per-database hc cache (#14022)
vporyadke Jan 30, 2025
17d3c4d
24-4: Set EnableDataShardVolatileTransactions = true by default (#14077)
snaury Jan 31, 2025
0ddde76
Revert "[24-4] EnableLocalDBBtreeIndex = true (#13923)" (#14082)
azevaykin Jan 31, 2025
482623c
enable stream lookup 24-4 (#14088)
gridnevvvit Jan 31, 2025
e60149b
EnableImmediateWritingOnBulkUpsert by default (#12272) (#14090)
zverevgeny Jan 31, 2025
a7d68df
Update version definition (#14087)
serbel324 Jan 31, 2025
752664d
Merge result changes 24 4 (#14102)
gridnevvvit Feb 2, 2025
7a1008a
prestable-24-4: enable feature flag EnableAlterDatabaseCreateHiveFirs…
ijon Feb 7, 2025
686068a
Revert "Add prestable into ci checks (#13927)"
maximyurchuk Feb 10, 2025
7ff1dbb
Revert "Temporary remove ReleaseApprovers for prestable"
maximyurchuk Feb 10, 2025
8d77ebe
Revert "enable per-database hc cache (#14022)"
maximyurchuk Feb 10, 2025
8fb35d4
Revert "prestable-24-4: enable feature flag EnableAlterDatabaseCreate…
maximyurchuk Feb 10, 2025
77103ce
SSA_RUNTIME_VERSION -> 4
maximyurchuk Feb 10, 2025
fbee8c8
Revert "prestable-24-4: Enable scale recommender feature flag (#13962)"
maximyurchuk Feb 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/apps/version/version_definition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ NKikimrConfig::TCurrentCompatibilityInfo NKikimr::TCompatibilityInfo::MakeCurren
.Application = "ydb",
.Version = TVersionConstructor{
.Year = 24,
.Major = 3,
.Major = 4,
},
.CanConnectTo = {
TCompatibilityRuleConstructor{
.Application = "nbs",
.LowerLimit = TVersionConstructor{ .Year = 23, .Major = 3 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 3 },
.UpperLimit = TVersionConstructor{ .Year = 24, .Major = 4 },
}
}
}.ToPB();
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/driver_lib/version/version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ const TStored* TCompatibilityInfo::GetDefault(TComponentId componentId) const {
// obsolete version control
TMaybe<NActors::TInterconnectProxyCommon::TVersionInfo> VERSION = NActors::TInterconnectProxyCommon::TVersionInfo{
// version of this binary
"stable-24-3",
"stable-24-4",

// compatible versions; must include all compatible old ones, including this one; version verification occurs on both
// peers and connection is accepted if at least one of peers accepts the version of the other peer
{
"stable-24-2",
"stable-24-3"
"stable-24-3",
"stable-24-4"
}
};

Expand Down
65 changes: 37 additions & 28 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <ydb/public/api/protos/ydb_query.pb.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/wilson_ids/wilson.h>


namespace NKikimr::NGRpcService {

Expand All @@ -25,6 +27,23 @@ struct TProducerState {
TMaybe<ui64> LastSeqNo;
i64 AckedFreeSpaceBytes = 0;
TActorId ActorId;
ui64 ChannelId = 0;

void SendAck(const NActors::TActorIdentity& actor) const {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);

actor.Send(ActorId, resp.Release());
}

bool ResumeIfStopped(const NActors::TActorIdentity& actor, i64 freeSpaceBytes) {
if (LastSeqNo && AckedFreeSpaceBytes <= 0) {
AckedFreeSpaceBytes = freeSpaceBytes;
SendAck(actor);
return true;
}
return false;
}
};

bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to,
Expand Down Expand Up @@ -163,7 +182,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {

TExecuteQueryRPC(TEvExecuteQueryRequest* request, ui64 inflightLimitBytes)
: Request_(request)
, FlowControl_(inflightLimitBytes) {}
, FlowControl_(inflightLimitBytes)
, Span_(TWilsonGrpc::RequestActor, request->GetWilsonTraceId(),
"RequestProxy.RpcOperationRequestActor", NWilson::EFlags::AUTO_END) {}

void Bootstrap(const TActorContext &ctx) {
this->Become(&TExecuteQueryRPC::StateWork);
Expand Down Expand Up @@ -255,7 +276,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
settings,
req->pool_id());

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
ReplyFinishStream(Ydb::StatusIds::INTERNAL_ERROR, std::move(issues));
Expand Down Expand Up @@ -286,28 +307,16 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
}

const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();

for (auto& pair : StreamChannels_) {
const auto& channelId = pair.first;
auto& channel = pair.second;

if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
<< ", channel: " << channelId
<< ", seqNo: " << channel.LastSeqNo
<< ", freeSpace: " << freeSpaceBytes);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*channel.LastSeqNo);
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetChannelId(channelId);

ctx.Send(channel.ActorId, resp.Release());

channel.AckedFreeSpaceBytes = freeSpaceBytes;
if (freeSpaceBytes > 0) {
for (auto& [channelId, channel] : StreamChannels_) {
if (channel.ResumeIfStopped(SelfId(), freeSpaceBytes)) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
<< ", channel: " << channelId
<< ", seqNo: " << channel.LastSeqNo
<< ", freeSpace: " << freeSpaceBytes);
}
}
}

}

void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -328,19 +337,15 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
channel.ActorId = ev->Sender;
channel.LastSeqNo = ev->Get()->Record.GetSeqNo();
channel.AckedFreeSpaceBytes = freeSpaceBytes;
channel.ChannelId = ev->Get()->Record.GetChannelId();

LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Send stream data ack"
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(freeSpaceBytes);
resp->Record.SetChannelId(ev->Get()->Record.GetChannelId());

ctx.Send(channel.ActorId, resp.Release());
channel.SendAck(SelfId());
}

void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
Expand Down Expand Up @@ -415,6 +420,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
void ReplySerializedAndFinishStream(Ydb::StatusIds::StatusCode status, TString&& buf) {
const auto finishStreamFlag = NYdbGrpc::IRequestContextBase::EStreamCtrl::FINISH;
Request_->SendSerializedResult(std::move(buf), status, finishStreamFlag);
NWilson::EndSpanWithStatus(Span_, status);
this->PassAway();
}

Expand Down Expand Up @@ -453,6 +459,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
} else {
Request_->FinishStream(status);
}
NWilson::EndSpanWithStatus(Span_, status);
this->PassAway();
}

Expand All @@ -479,6 +486,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
NKikimrKqp::EQueryAction QueryAction;
TRpcFlowControlState FlowControl_;
TMap<ui64, TProducerState> StreamChannels_;

NWilson::TSpan Span_;
};

} // namespace
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,10 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ExecuterActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo_);
// scan query has single result set, so it's ok to put zero as channelId here.
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ExecuterActorId_, resp.Release());

AckedFreeSpaceBytes_ = freeSpaceBytes;
}
}
Expand Down Expand Up @@ -326,6 +324,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
}

void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {

auto& record = ev->Get()->Record;
NYql::TIssues issues = ev->Get()->GetIssues();

Expand Down Expand Up @@ -361,8 +360,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ class TStreamExecuteYqlScriptRPC
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(ev->Sender, resp.Release());
Expand Down Expand Up @@ -320,8 +319,7 @@ class TStreamExecuteYqlScriptRPC
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << GatewayRequestHandlerActorId_);

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(*LastSeqNo_);
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);

ctx.Send(GatewayRequestHandlerActorId_, resp.Release());
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -848,10 +848,16 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));

{
for (size_t i = 10; i--;) {
std::vector<std::pair<TString, std::vector<i32>>> topics {{topicName, {0}}};
auto msg = client.Fetch(topics);

if (msg->Responses.empty() || msg->Responses[0].Partitions.empty() || !msg->Responses[0].Partitions[0].Records.has_value()) {
UNIT_ASSERT_C(i, "Timeout");
Sleep(TDuration::Seconds(1));
continue;
}

UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto record = msg->Responses[0].Partitions[0].Records->Records[0];

Expand All @@ -867,6 +873,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
auto headerValueStr = TString(headerValue.data(), headerValue.size());
UNIT_ASSERT_VALUES_EQUAL(dataStr, value);

break;
}

auto m = Read(topicReader);
Expand Down Expand Up @@ -1250,10 +1257,17 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS);
}

{
for (size_t i = 10; i--;) {
// Check CDC
std::vector<std::pair<TString, std::vector<i32>>> topics {{feedPath, {0}}};
auto msg = client.Fetch(topics);

if (msg->Responses.empty() || msg->Responses[0].Partitions.empty() || !msg->Responses[0].Partitions[0].Records.has_value()) {
UNIT_ASSERT_C(i, "Timeout");
Sleep(TDuration::Seconds(1));
continue;
}

UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
Expand All @@ -1264,6 +1278,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
auto data = record.Value.value();
auto dataStr = TString(data.data(), data.size());
UNIT_ASSERT_VALUES_EQUAL(dataStr, "{\"update\":{\"value\":2},\"key\":[1]}");

break;
}

} // Y_UNIT_TEST(FetchScenario)
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,19 @@ struct TEvKqpExecuter {
TKqpExecuterEvents::EvStreamData> {};

struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck,
TKqpExecuterEvents::EvStreamDataAck> {};
TKqpExecuterEvents::EvStreamDataAck>
{
friend class TEventPBBase;
explicit TEvStreamDataAck(ui64 seqno, ui64 channelId)
{
Record.SetSeqNo(seqno);
Record.SetChannelId(channelId);
}

private:
// using a little hack to hide default empty constructor
TEvStreamDataAck() = default;
};

struct TEvStreamProfile : public TEventPB<TEvStreamProfile, NKikimrKqp::TEvExecuterStreamProfile,
TKqpExecuterEvents::EvStreamProfile> {};
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ class TKqpScanQueryRequestHandler : public TRequestHandlerBase<
ResultSet.set_truncated(true);
}

auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetEnough(truncated);
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(ResultSetBytesLimit);
ctx.Send(ev->Sender, resp.Release());
}
Expand Down Expand Up @@ -497,8 +496,7 @@ class TKqpGenericQueryRequestHandler: public TRequestHandlerBase<
}
}

auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max());
Send(ev->Sender, response.Release());
}
Expand Down
26 changes: 13 additions & 13 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
if (!ExecutionEntryExists && status == Ydb::StatusIds::SUCCESS) {
status = Ydb::StatusIds::NOT_FOUND;
issues.AddIssue("No such execution");
issues.AddIssue("No such execution");
}

if (status == Ydb::StatusIds::SUCCESS) {
Expand Down Expand Up @@ -1796,26 +1796,26 @@ class TSaveScriptExecutionResultQuery : public TQueryBase {
.AddParam("$items");

param
.BeginList();
.BeginList();

auto row = FirstRow;
for (const auto& rowValue : ResultSet.rows()) {
auto rowValueSerialized = rowValue.SerializeAsString();
SavedSize += rowValueSerialized.size();
param
.AddListItem()
.BeginStruct()
.AddMember("row_id")
.Int64(row++)
.AddMember("result_set")
.String(std::move(rowValueSerialized))
.AddMember("accumulated_size")
.Int64(AccumulatedSize + SavedSize)
.EndStruct();
.AddListItem()
.BeginStruct()
.AddMember("row_id")
.Int64(row++)
.AddMember("result_set")
.String(std::move(rowValueSerialized))
.AddMember("accumulated_size")
.Int64(AccumulatedSize + SavedSize)
.EndStruct();
}
param
.EndList()
.Build();
.EndList()
.Build();

RunDataQuery(sql, &params);
}
Expand Down
Loading
Loading