Skip to content

Commit 014944a

Browse files
authored
Merge f5c9f17 into b1a2b73
2 parents b1a2b73 + f5c9f17 commit 014944a

File tree

2 files changed

+57
-5
lines changed

2 files changed

+57
-5
lines changed

ydb/library/yql/dq/actors/spilling/spilling_file.cpp

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
#include <util/folder/path.h>
1414
#include <util/stream/file.h>
1515
#include <util/thread/pool.h>
16+
#include <util/generic/guid.h>
17+
#include <util/folder/iterator.h>
18+
#include <util/generic/vector.h>
1619

1720
namespace NYql::NDq {
1821

@@ -159,6 +162,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
159162
EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1,
160163
EvWriteFileResponse,
161164
EvReadFileResponse,
165+
EvRemoveOldTmp,
162166

163167
LastEvent
164168
};
@@ -189,6 +193,15 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
189193
bool Removed = false;
190194
TMaybe<TString> Error;
191195
};
196+
197+
struct TEvRemoveOldTmp : public TEventLocal<TEvRemoveOldTmp, EvRemoveOldTmp> {
198+
TFsPath TmpRoot;
199+
ui32 NodeId;
200+
TString GuidString;
201+
202+
TEvRemoveOldTmp(TFsPath tmpRoot, ui32 nodeId, TString guidString)
203+
: TmpRoot(std::move(tmpRoot)), NodeId(nodeId), GuidString(std::move(guidString)) {}
204+
};
192205
};
193206

194207
struct TFileDesc;
@@ -206,7 +219,13 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
206219

207220
void Bootstrap() {
208221
Root_ = Config_.Root;
209-
Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId());
222+
const auto nodeId = SelfId().NodeId();
223+
const auto guidString = TGUID::Create().AsGuidString();
224+
225+
Send(SelfId(), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(Root_, nodeId, guidString));
226+
227+
Root_ /= (TStringBuilder() << "node_" << nodeId << "_" << guidString);
228+
Cerr << "Root from config: " << Config_.Root << ", Root_: " << Root_ << "\n";
210229

211230
LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId());
212231

@@ -271,6 +290,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
271290
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
272291
hFunc(TEvDqSpilling::TEvRead, HandleWork)
273292
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
293+
hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork)
274294
hFunc(NMon::TEvHttpInfo, HandleWork)
275295
cFunc(TEvents::TEvPoison::EventType, PassAway)
276296
);
@@ -712,6 +732,37 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
712732
Send(ev->Sender, new NMon::TEvHttpInfoRes(s.Str()));
713733
}
714734

735+
void HandleWork(TEvPrivate::TEvRemoveOldTmp::TPtr& ev) {
736+
auto& msg = *ev->Get();
737+
auto& root = msg.TmpRoot;
738+
auto nodeIdString = ToString(msg.NodeId);
739+
auto& guidString = msg.GuidString;
740+
741+
LOG_I("[RemoveOldTmp] removing at root: " << root);
742+
743+
TDirIterator iter(root, TDirIterator::TOptions().SetMaxLevel(1));
744+
745+
TVector<TString> oldTmps;
746+
for (const auto &dirEntry : iter) {
747+
if (dirEntry.fts_info == FTS_DP) {
748+
// skip postorder visit
749+
continue;
750+
}
751+
TString dirName = dirEntry.fts_name;
752+
TVector<TString> parts;
753+
StringSplitter(dirName).Split('_').Collect(&parts);
754+
755+
if (parts.size() == 3 && parts[0] == "node" && parts[1] == nodeIdString && parts[2] != guidString) {
756+
LOG_D("[RemoveOldTmp] found old temporary at " << (root / dirName));
757+
oldTmps.emplace_back(std::move(dirName));
758+
}
759+
}
760+
761+
ForEach(oldTmps.begin(), oldTmps.end(), [&root](const auto& dirName) {
762+
(root / dirName).ForceDelete();
763+
});
764+
}
765+
715766
private:
716767
void RunOp(TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
717768
if (fd.HasActiveOp) {

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,12 @@ class TLocalServiceHolder {
8080
TActorSetupCmd(resman, TMailboxType::Simple, 0));
8181

8282
if (withSpilling) {
83-
char tempDir[MAX_PATH];
84-
if (MakeTempDir(tempDir, nullptr) != 0)
85-
ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir;
83+
auto tempDir = TFsPath{GetSystemTempDir()};
84+
tempDir /= "spilling-service-tmp";
8685

87-
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
86+
MakeDirIfNotExist(tempDir);
87+
88+
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = false}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
8889

8990
ServiceNode->AddLocalService(
9091
NDq::MakeDqLocalFileSpillingServiceID(nodeId),

0 commit comments

Comments
 (0)