Skip to content

Commit c8532fe

Browse files
authored
Merge 41c9e98 into bd147a9
2 parents bd147a9 + 41c9e98 commit c8532fe

File tree

5 files changed

+97
-12
lines changed

5 files changed

+97
-12
lines changed

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
4545
#include <library/cpp/monlib/service/pages/templates.h>
4646
#include <library/cpp/resource/resource.h>
47-
#include <util/generic/guid.h>
4847

4948
namespace NKikimr::NKqp {
5049

@@ -236,7 +235,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
236235
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
237236
TString spillingRoot = cfg.GetRoot();
238237
if (spillingRoot.empty()) {
239-
spillingRoot = TStringBuilder() << "/tmp/ydb_spilling_" << CreateGuidAsString() << "/";
238+
spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
240239
}
241240

242241
SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(

ydb/library/yql/dq/actors/spilling/spilling_file.cpp

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
#include <util/folder/path.h>
1414
#include <util/stream/file.h>
1515
#include <util/thread/pool.h>
16+
#include <util/generic/guid.h>
17+
#include <util/folder/iterator.h>
18+
#include <util/generic/vector.h>
19+
#include <util/folder/dirut.h>
20+
#include <util/system/user.h>
1621

1722
namespace NYql::NDq {
1823

@@ -159,6 +164,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
159164
EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1,
160165
EvWriteFileResponse,
161166
EvReadFileResponse,
167+
EvRemoveOldTmp,
162168

163169
LastEvent
164170
};
@@ -189,6 +195,15 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
189195
bool Removed = false;
190196
TMaybe<TString> Error;
191197
};
198+
199+
struct TEvRemoveOldTmp : public TEventLocal<TEvRemoveOldTmp, EvRemoveOldTmp> {
200+
TFsPath TmpRoot;
201+
ui32 NodeId;
202+
TString SpillingSessionId;
203+
204+
TEvRemoveOldTmp(TFsPath tmpRoot, ui32 nodeId, TString spillingSessionId)
205+
: TmpRoot(std::move(tmpRoot)), NodeId(nodeId), SpillingSessionId(std::move(spillingSessionId)) {}
206+
};
192207
};
193208

194209
struct TFileDesc;
@@ -206,8 +221,12 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
206221

207222
void Bootstrap() {
208223
Root_ = Config_.Root;
209-
Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId());
224+
const auto rootToRemoveOldTmp = Root_;
225+
const auto sessionId = Config_.SpillingSessionId;
226+
const auto nodeId = SelfId().NodeId();
210227

228+
Root_ /= (TStringBuilder() << "node_" << nodeId << "_" << sessionId);
229+
// Cerr << "Root from config: " << Config_.Root << ", actual root: " << Root_ << "\n";
211230
LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId());
212231

213232
try {
@@ -221,6 +240,8 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
221240
Become(&TDqLocalFileSpillingService::BrokenState);
222241
return;
223242
}
243+
244+
Send(SelfId(), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(Root_, nodeId, sessionId));
224245

225246
Become(&TDqLocalFileSpillingService::WorkState);
226247
}
@@ -271,6 +292,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
271292
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
272293
hFunc(TEvDqSpilling::TEvRead, HandleWork)
273294
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
295+
hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork)
274296
hFunc(NMon::TEvHttpInfo, HandleWork)
275297
cFunc(TEvents::TEvPoison::EventType, PassAway)
276298
);
@@ -712,6 +734,55 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
712734
Send(ev->Sender, new NMon::TEvHttpInfoRes(s.Str()));
713735
}
714736

737+
void HandleWork(TEvPrivate::TEvRemoveOldTmp::TPtr& ev) {
738+
const auto& msg = *ev->Get();
739+
const auto& root = msg.TmpRoot;
740+
const auto nodeIdString = ToString(msg.NodeId);
741+
const auto& sessionId = msg.SpillingSessionId;
742+
743+
LOG_I("[RemoveOldTmp] removing at root: " << root);
744+
745+
static const auto isDirOldTmp = [&nodeIdString, &sessionId](TString dirName) -> bool {
746+
// dirName: node_<nodeId>_<sessionId>
747+
static constexpr size_t NodeIdBegin = 5;
748+
if (dirName.Size() < NodeIdBegin || dirName.substr(0, NodeIdBegin) != "node_") {
749+
return false;
750+
}
751+
const auto nodeIdEnd = dirName.find('_', NodeIdBegin);
752+
if (nodeIdEnd == TString::npos || dirName.substr(NodeIdBegin, nodeIdEnd - NodeIdBegin) != nodeIdString) {
753+
return false;
754+
}
755+
if (dirName.substr(nodeIdEnd + 1) == sessionId) {
756+
return false;
757+
}
758+
return true;
759+
};
760+
761+
try {
762+
TDirIterator iter(root, TDirIterator::TOptions().SetMaxLevel(1));
763+
764+
TVector<TString> oldTmps;
765+
for (const auto &dirEntry : iter) {
766+
if (dirEntry.fts_info == FTS_DP) {
767+
// skip postorder visit
768+
continue;
769+
}
770+
771+
const auto dirName = dirEntry.fts_name;
772+
if (isDirOldTmp(dirName)) {
773+
LOG_D("[RemoveOldTmp] found old temporary at " << (root / dirName));
774+
oldTmps.emplace_back(std::move(dirName));
775+
}
776+
}
777+
778+
ForEach(oldTmps.begin(), oldTmps.end(), [&root](const auto& dirName) {
779+
(root / dirName).ForceDelete();
780+
});
781+
} catch (const yexception& e) {
782+
LOG_E("[RemoveOldTmp] removing failed due to: " << e.what());
783+
}
784+
}
785+
715786
private:
716787
void RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
717788
if (fd.HasActiveOp) {
@@ -952,6 +1023,13 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
9521023

9531024
} // anonymous namespace
9541025

1026+
TFsPath GetTmpSpillingRootForCurrentUser() {
1027+
auto root = TFsPath{GetSystemTempDir()};
1028+
root /= "spilling-tmp-" + GetUsername();
1029+
MakeDirIfNotExist(root);
1030+
return root;
1031+
}
1032+
9551033
IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const TActorId& client,
9561034
bool removeBlobsAfterRead)
9571035
{

ydb/library/yql/dq/actors/spilling/spilling_file.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@
77

88
#include <util/system/types.h>
99
#include <util/generic/strbuf.h>
10+
#include <util/folder/path.h>
11+
#include <util/generic/guid.h>
1012

1113
namespace NYql::NDq {
1214

1315
struct TFileSpillingServiceConfig {
1416
TString Root;
17+
TString SpillingSessionId = CreateGuidAsString();
1518
ui64 MaxTotalSize = 0;
1619
ui64 MaxFileSize = 0;
1720
ui64 MaxFilePartSize = 0;
@@ -26,6 +29,8 @@ inline NActors::TActorId MakeDqLocalFileSpillingServiceID(ui32 nodeId) {
2629
return NActors::TActorId(nodeId, TStringBuf(name, 12));
2730
}
2831

32+
TFsPath GetTmpSpillingRootForCurrentUser();
33+
2934
NActors::IActor* CreateDqLocalFileSpillingActor(TTxId txId, const TString& details, const NActors::TActorId& client, bool removeBlobsAfterRead);
3035

3136
NActors::IActor* CreateDqLocalFileSpillingService(const TFileSpillingServiceConfig& config, TIntrusivePtr<TSpillingCounters> counters);

ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,19 @@ class TTestActorRuntime: public TTestActorRuntimeBase {
4848
return str;
4949
}
5050

51+
const TString& GetSpillingSessionId() const {
52+
return SpillingSessionId_;
53+
}
54+
5155
TActorId StartSpillingService(ui64 maxTotalSize = 1000, ui64 maxFileSize = 500,
5256
ui64 maxFilePartSize = 100, const TFsPath& root = TFsPath::Cwd() / GetSpillingPrefix())
5357
{
5458
SpillingRoot_ = root;
59+
SpillingSessionId_ = CreateGuidAsString();
5560

5661
auto config = TFileSpillingServiceConfig{
5762
.Root = root.GetPath(),
63+
.SpillingSessionId = SpillingSessionId_,
5864
.MaxTotalSize = maxTotalSize,
5965
.MaxFileSize = maxFileSize,
6066
.MaxFilePartSize = maxFilePartSize
@@ -91,6 +97,7 @@ class TTestActorRuntime: public TTestActorRuntimeBase {
9197

9298
private:
9399
TFsPath SpillingRoot_;
100+
TString SpillingSessionId_;
94101
};
95102

96103
TBuffer CreateBlob(ui32 size, char symbol) {
@@ -303,8 +310,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
303310
auto spillingActor = runtime.StartSpillingActor(tester);
304311

305312
runtime.WaitBootstrap();
306-
307-
const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_";
313+
const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_";
308314

309315
for (ui32 i = 0; i < 5; ++i) {
310316
// Cerr << "---- store blob #" << i << Endl;
@@ -346,7 +352,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
346352

347353
runtime.WaitBootstrap();
348354

349-
const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "/1_test_";
355+
const TString filePrefix = TStringBuilder() << runtime.GetSpillingRoot().GetPath() << "/node_" << runtime.GetNodeId() << "_" << runtime.GetSpillingSessionId() << "/1_test_";
350356

351357
for (ui32 i = 0; i < 5; ++i) {
352358
// Cerr << "---- store blob #" << i << Endl;
@@ -393,8 +399,7 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
393399
auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvWriteResult>(tester);
394400
UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId);
395401
}
396-
397-
auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId()));
402+
auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId()) + "_" + runtime.GetSpillingSessionId());
398403
(runtime.GetSpillingRoot() / nodePath / "1_test_0").ForceDelete();
399404

400405
{

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,9 @@ class TLocalServiceHolder {
8080
TActorSetupCmd(resman, TMailboxType::Simple, 0));
8181

8282
if (withSpilling) {
83-
char tempDir[MAX_PATH];
84-
if (MakeTempDir(tempDir, nullptr) != 0)
85-
ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir;
83+
auto tempDir = NDq::GetTmpSpillingRootForCurrentUser();
8684

87-
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
85+
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = false}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
8886

8987
ServiceNode->AddLocalService(
9088
NDq::MakeDqLocalFileSpillingServiceID(nodeId),

0 commit comments

Comments
 (0)