Skip to content

Commit ce05f61

Browse files
committed
Fixes issues
1 parent 95d1fdc commit ce05f61

18 files changed

+128
-271
lines changed

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,21 +112,14 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
112112
tableDesc->SetPath(tableDesc->GetPath() + SessionId);
113113
YQL_ENSURE(KqpTempTablesAgentActor != TActorId(),
114114
"Create temp table with empty KqpTempTablesAgentActor");
115-
tableDesc->SetOwnerActorId(KqpTempTablesAgentActor.ToString());
115+
ActorIdToProto(KqpTempTablesAgentActor, modifyScheme.MutableTempTableOwnerActorId());
116116
}
117117
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
118118
break;
119119
}
120120

121121
case NKqpProto::TKqpSchemeOperation::kDropTable: {
122122
auto modifyScheme = schemeOp.GetDropTable();
123-
if (Temporary) {
124-
auto* dropTable = modifyScheme.MutableDrop();
125-
dropTable->SetTemporary(true);
126-
YQL_ENSURE(KqpTempTablesAgentActor != TActorId(),
127-
"Drop temp table with empty KqpTempTablesAgentActor");
128-
dropTable->SetOwnerActorId(KqpTempTablesAgentActor.ToString());
129-
}
130123
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
131124
break;
132125
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1921,7 +1921,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
19211921

19221922
LOG_D("Cleanup temp tables: " << TempTablesState.TempTables.size());
19231923
auto tempTablesManager = CreateKqpTempTablesManager(
1924-
std::move(TempTablesState), SelfId(), KqpTempTablesAgentActor);
1924+
std::move(TempTablesState), SelfId());
19251925
RegisterWithSameMailbox(tempTablesManager);
19261926
return;
19271927
} else {

ydb/core/kqp/session_actor/kqp_session_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,6 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
4242
const TActorId& kqpTempTablesAgentActor);
4343

4444
IActor* CreateKqpTempTablesManager(
45-
TKqpTempTablesState tempTablesState, const TActorId& target, const TActorId& kqpTempTablesAgentActor);
45+
TKqpTempTablesState tempTablesState, const TActorId& target);
4646

4747
} // namespace NKikimr::NKqp

ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ class TKqpTempTablesManager : public TActorBootstrapped<TKqpTempTablesManager> {
4040
}
4141

4242
TKqpTempTablesManager(
43-
TKqpTempTablesState tempTablesState, const TActorId& target, const TActorId& kqpTempTablesAgentActor)
43+
TKqpTempTablesState tempTablesState, const TActorId& target)
4444
: TempTablesState(std::move(tempTablesState))
4545
, Target(target)
46-
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
4746
{}
4847

4948
void Bootstrap() {
@@ -66,8 +65,6 @@ class TKqpTempTablesManager : public TActorBootstrapped<TKqpTempTablesManager> {
6665
YQL_ENSURE(TempTablesState.SessionId, "Empty TempTablesState.SessionId");
6766

6867
drop->SetName(info.Name + *TempTablesState.SessionId);
69-
drop->SetOwnerActorId(KqpTempTablesAgentActor.ToString());
70-
drop->SetTemporary(true);
7168

7269
auto promise = NewPromise<IKqpGateway::TGenericResult>();
7370
IActor* requestHandler = new TSchemeOpRequestHandler(ev.Release(), promise, true);
@@ -112,16 +109,15 @@ class TKqpTempTablesManager : public TActorBootstrapped<TKqpTempTablesManager> {
112109
private:
113110
TKqpTempTablesState TempTablesState;
114111
const TActorId Target;
115-
const TActorId KqpTempTablesAgentActor;
116112
ui32 ResultsCount = 0;
117113
};
118114

119115
} // namespace
120116

121117
IActor* CreateKqpTempTablesManager(
122-
TKqpTempTablesState tempTablesState, const TActorId& target, const TActorId& kqpTempTablesAgentActor)
118+
TKqpTempTablesState tempTablesState, const TActorId& target)
123119
{
124-
return new TKqpTempTablesManager(tempTablesState, target, kqpTempTablesAgentActor);
120+
return new TKqpTempTablesManager(tempTablesState, target);
125121
}
126122

127123
} // namespace NKikimr::NKqp

ydb/core/protos/flat_scheme_op.proto

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import "ydb/core/protos/blob_depot_config.proto";
1414
import "ydb/public/api/protos/ydb_coordination.proto";
1515
import "ydb/public/api/protos/ydb_export.proto";
1616
import "ydb/public/api/protos/ydb_value.proto";
17+
import "ydb/library/actors/protos/actors.proto";
1718
import "ydb/library/mkql_proto/protos/minikql.proto";
1819
import "ydb/core/protos/index_builder.proto";
1920

@@ -36,8 +37,6 @@ message TDrop {
3637
optional string Name = 1;
3738
optional EDropWaitPolicy WaitPolicy = 2;
3839
optional uint64 Id = 3;
39-
optional bool Temporary = 4;
40-
optional string OwnerActorId = 5;
4140
}
4241

4342
enum EColumnCodec {
@@ -391,7 +390,6 @@ message TTableDescription {
391390
optional TTableReplicationConfig ReplicationConfig = 40;
392391

393392
optional bool Temporary = 41;
394-
optional string OwnerActorId = 42;
395393
}
396394

397395
message TDictionaryEncodingSettings {
@@ -1519,6 +1517,8 @@ message TModifyScheme {
15191517
optional bool FailedOnAlreadyExists = 63 [default = true];
15201518

15211519
optional TViewDescription CreateView = 64;
1520+
1521+
optional NActorsProto.TActorId TempTableOwnerActorId = 65;
15221522
}
15231523

15241524
// "Script", used by client to parse text files with multiple DDL commands

ydb/core/tx/schemeshard/schemeshard__background_cleaning.cpp

Lines changed: 54 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -4,122 +4,75 @@
44

55
namespace NKikimr::NSchemeShard {
66

7-
class TTempTableDropStarter: public TActorBootstrapped<TTempTableDropStarter> {
8-
public:
9-
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
10-
return NKikimrServices::TActivity::SCHEMESHARD_TEMP_TABLE_DROP_STARTER;
11-
}
12-
13-
explicit TTempTableDropStarter(const TActorId& ssActorId, THolder<TEvPrivate::TEvDropTempTable>&& req)
14-
: SSActorId(ssActorId)
15-
, Request(std::move(req)) // without txId
16-
{
17-
}
18-
19-
void Bootstrap() {
20-
AllocateTxId();
21-
Become(&TTempTableDropStarter::StateWork);
22-
}
23-
24-
STATEFN(StateWork) {
25-
switch (ev->GetTypeRewrite()) {
26-
hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle)
27-
sFunc(TEvents::TEvPoison, PassAway);
28-
}
29-
}
30-
31-
private:
32-
void AllocateTxId() {
33-
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
34-
}
35-
36-
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
37-
Request->TxId = ev->Get()->TxId;
38-
Send(SSActorId, Request.Release());
39-
PassAway();
7+
NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCleaning(const TPathId& pathId) {
8+
auto info = ResolveTempTableInfo(pathId);
9+
if (!info) {
10+
return NOperationQueue::EStartStatus::EOperationRemove;
4011
}
4112

42-
private:
43-
const TActorId SSActorId;
44-
THolder<TEvPrivate::TEvDropTempTable> Request;
45-
};
46-
47-
NOperationQueue::EStartStatus TSchemeShard::StartBackgroundCleaning(const TBackgroundCleaningInfo& info) {
4813
auto& tempTablesByOwner = TempTablesState.TempTablesByOwner;
4914

50-
auto it = tempTablesByOwner.find(info.second);
15+
auto it = tempTablesByOwner.find(info->OwnerActorId);
5116
if (it == tempTablesByOwner.end()) {
5217
return NOperationQueue::EStartStatus::EOperationRemove;
5318
}
5419

55-
auto tempTableIt = it->second.find(info.GetPathId());
20+
auto tempTableIt = it->second.find(pathId);
5621
if (tempTableIt == it->second.end()) {
5722
return NOperationQueue::EStartStatus::EOperationRemove;
5823
}
5924

6025
auto ctx = ActorContext();
6126
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "RunBackgroundCleaning "
62-
"for temp table# " << JoinPath({info.first.first, info.first.second})
63-
<< ", ownerId# " << info.second
27+
"for temp table# " << JoinPath({info->WorkingDir, info->Name})
28+
<< ", ownerId# " << info->OwnerActorId
6429
<< ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta()
6530
<< ", rate# " << BackgroundCleaningQueue->GetRate()
6631
<< ", in queue# " << BackgroundCleaningQueue->Size() << " cleaning events"
6732
<< ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events"
6833
<< " at schemeshard " << TabletID());
6934

70-
auto ev = MakeHolder<TEvPrivate::TEvDropTempTable>();
71-
ev->WorkingDir = info.first.first;
72-
ev->Name = info.first.second;
73-
ev->PathId = info.GetPathId();
74-
75-
ctx.Register(new TTempTableDropStarter(ctx.SelfID, std::move(ev)));
76-
77-
return NOperationQueue::EStartStatus::EOperationRunning;
78-
}
79-
80-
void TSchemeShard::HandleBackgroundCleaningCompletionResult(ui64 txId) {
81-
auto txsIt = BackgroundCleaningTxs.find(txId);
82-
if (txsIt == BackgroundCleaningTxs.end()) {
83-
return;
84-
}
85-
86-
auto tempTablePath = TPath::Init(txsIt->second, this);
87-
auto tempTableName = tempTablePath.LeafName();
88-
auto tempTableWorkingDir = tempTablePath.Parent().PathString();
89-
90-
BackgroundCleaningQueue->OnDone(TBackgroundCleaningInfo(
91-
std::move(tempTableWorkingDir),
92-
std::move(tempTableName),
93-
TActorId(),
94-
txsIt->second)
95-
);
96-
}
35+
auto txId = GetCachedTxId(ctx);
9736

98-
void TSchemeShard::Handle(TEvPrivate::TEvDropTempTable::TPtr& ev, const TActorContext& ctx) {
99-
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
100-
"Start to drop temp table: " << JoinPath({ev->Get()->WorkingDir, ev->Get()->Name}));
101-
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(ev->Get()->TxId), TabletID());
37+
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), TabletID());
10238
auto& record = propose->Record;
10339

10440
auto& modifyScheme = *record.AddTransaction();
10541
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropTable);
106-
modifyScheme.SetWorkingDir(ev->Get()->WorkingDir);
42+
modifyScheme.SetWorkingDir(info->WorkingDir);
43+
modifyScheme.SetInternal(true);
10744

10845
auto& drop = *modifyScheme.MutableDrop();
109-
drop.SetName(ev->Get()->Name);
46+
drop.SetName(info->Name);
11047

111-
ui64 txId = ev->Get()->TxId;
112-
BackgroundCleaningTxs[txId] = ev->Get()->PathId;
48+
BackgroundCleaningTxs[txId] = pathId;
11349

11450
Send(SelfId(), std::move(propose));
51+
52+
return NOperationQueue::EStartStatus::EOperationRunning;
11553
}
11654

117-
void TSchemeShard::OnBackgroundCleaningTimeout(const TBackgroundCleaningInfo& info) {
55+
void TSchemeShard::HandleBackgroundCleaningCompletionResult(const TTxId& txId) {
56+
const auto& pathId = BackgroundCleaningTxs.at(txId);
57+
11858
auto ctx = ActorContext();
59+
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Get BackgroundCleaning CompletionResult "
60+
"for txId# " << txId
61+
<< ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta()
62+
<< ", in queue# " << BackgroundCleaningQueue->GetRate() << " cleaning events"
63+
<< ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events"
64+
<< " at schemeshard " << TabletID());
65+
66+
BackgroundCleaningQueue->OnDone(pathId);
67+
}
11968

120-
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Background cleaning timeout "
121-
"for temp table# " << JoinPath({info.first.first, info.first.second})
122-
<< ", ownerId# " << info.second
69+
void TSchemeShard::OnBackgroundCleaningTimeout(const TPathId& pathId) {
70+
auto info = ResolveTempTableInfo(pathId);
71+
72+
auto ctx = ActorContext();
73+
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "BackgroundCleaning timeout "
74+
"for temp table# " << JoinPath({info->WorkingDir, info->Name})
75+
<< ", ownerId# " << info->OwnerActorId
12376
<< ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta()
12477
<< ", in queue# " << BackgroundCleaningQueue->GetRate() << " cleaning events"
12578
<< ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events"
@@ -199,17 +152,7 @@ void TSchemeShard::RetryNodeSubscribe(ui32 nodeId) {
199152

200153
auto& currentTempTables = itTempTables->second;
201154
for (auto& pathId: currentTempTables) {
202-
auto tempTablePath = TPath::Init(pathId, this);
203-
auto tempTableName = tempTablePath.LeafName();
204-
auto tempTableWorkingDir = tempTablePath.Parent().PathString();
205-
206-
207-
EnqueueBackgroundCleaning(
208-
TBackgroundCleaningInfo(
209-
std::move(tempTableWorkingDir),
210-
std::move(tempTableName),
211-
ownerActorId,
212-
pathId));
155+
EnqueueBackgroundCleaning(pathId);
213156
}
214157
tempTablesByOwner.erase(itTempTables);
215158
}
@@ -249,16 +192,7 @@ bool TSchemeShard::CheckOwnerUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
249192
auto& currentTempTables = it->second;
250193

251194
for (auto& pathId: currentTempTables) {
252-
auto tempTablePath = TPath::Init(pathId, this);
253-
auto tempTableName = tempTablePath.LeafName();
254-
auto tempTableWorkingDir = tempTablePath.Parent().PathString();
255-
256-
EnqueueBackgroundCleaning(
257-
TBackgroundCleaningInfo(
258-
std::move(tempTableWorkingDir),
259-
std::move(tempTableName),
260-
ownerActorId,
261-
pathId));
195+
EnqueueBackgroundCleaning(pathId);
262196
}
263197
tempTablesByOwner.erase(it);
264198

@@ -281,62 +215,46 @@ bool TSchemeShard::CheckOwnerUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
281215
return true;
282216
}
283217

284-
void TSchemeShard::EnqueueBackgroundCleaning(const TBackgroundCleaningInfo& info) {
218+
void TSchemeShard::EnqueueBackgroundCleaning(const TPathId& pathId) {
285219
if (BackgroundCleaningQueue) {
286-
BackgroundCleaningQueue->Enqueue(std::move(info));
220+
BackgroundCleaningQueue->Enqueue(std::move(pathId));
287221
}
288222
}
289223

290-
void TSchemeShard::RemoveBackgroundCleaning(const TBackgroundCleaningInfo& info) {
224+
void TSchemeShard::RemoveBackgroundCleaning(const TPathId& pathId) {
291225
if (BackgroundCleaningQueue) {
292-
BackgroundCleaningQueue->Remove(std::move(info));
226+
BackgroundCleaningQueue->Remove(std::move(pathId));
293227
}
294228
}
295229

296230
void TSchemeShard::HandleBackgroundCleaningTransactionResult(
297231
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& result) {
298-
const auto txId = result->Get()->Record.GetTxId();
232+
const auto txId = TTxId(result->Get()->Record.GetTxId());
299233

300234
auto ctx = ActorContext();
301-
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
302-
"Get TransactionResult of temp table drop with txId: " << txId);
235+
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Get BackgroundCleaning TransactionResult "
236+
"for txId# " << txId
237+
<< ", next wakeup# " << BackgroundCleaningQueue->GetWakeupDelta()
238+
<< ", in queue# " << BackgroundCleaningQueue->GetRate() << " cleaning events"
239+
<< ", running# " << BackgroundCleaningQueue->RunningSize() << " cleaning events"
240+
<< " at schemeshard " << TabletID());
303241

304-
auto txsIt = BackgroundCleaningTxs.find(txId);
305-
if (txsIt == BackgroundCleaningTxs.end()) {
306-
return;
307-
}
242+
const auto& pathId = BackgroundCleaningTxs.at(txId);
308243

309244
const NKikimrScheme::TEvModifySchemeTransactionResult &record = result->Get()->Record;
310245

311246
switch (record.GetStatus()) {
312-
case NKikimrScheme::EStatus::StatusAlreadyExists:
247+
case NKikimrScheme::EStatus::StatusPathDoesNotExist:
313248
case NKikimrScheme::EStatus::StatusSuccess: {
314-
auto tempTablePath = TPath::Init(txsIt->second, this);
315-
auto tempTableName = tempTablePath.LeafName();
316-
auto tempTableWorkingDir = tempTablePath.Parent().PathString();
317-
318-
BackgroundCleaningQueue->OnDone(TBackgroundCleaningInfo(
319-
std::move(tempTableWorkingDir),
320-
std::move(tempTableName),
321-
TActorId(),
322-
txsIt->second)
323-
);
249+
BackgroundCleaningQueue->OnDone(pathId);
324250
break;
325251
}
326252
case NKikimrScheme::EStatus::StatusAccepted:
327-
Send(SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(txId));
253+
Send(SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(record.GetTxId()));
328254
break;
329255
default: {
330-
auto tempTablePath = TPath::Init(txsIt->second, this);
331-
auto tempTableName = tempTablePath.LeafName();
332-
auto tempTableWorkingDir = tempTablePath.Parent().PathString();
333-
334-
BackgroundCleaningQueue->OnDone(TBackgroundCleaningInfo(
335-
std::move(tempTableWorkingDir),
336-
std::move(tempTableName),
337-
TActorId(),
338-
txsIt->second)
339-
);
256+
BackgroundCleaningQueue->OnDone(pathId);
257+
EnqueueBackgroundCleaning(pathId);
340258
break;
341259
}
342260
}

0 commit comments

Comments
 (0)