Skip to content

Commit 2263ccf

Browse files
authored
Merge da6f670 into 4d1e99e
2 parents 4d1e99e + da6f670 commit 2263ccf

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

ydb/library/yql/dq/actors/spilling/spilling_file.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
#include <util/folder/path.h>
1414
#include <util/stream/file.h>
1515
#include <util/thread/pool.h>
16+
#include <util/generic/guid.h>
17+
#include <util/folder/iterator.h>
18+
#include <util/generic/vector.h>
1619

1720
namespace NYql::NDq {
1821

@@ -206,10 +209,40 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
206209

207210
void Bootstrap() {
208211
Root_ = Config_.Root;
209-
Root_ /= (TStringBuilder() << "node_" << SelfId().NodeId());
212+
const auto root = Root_;
213+
214+
const auto nodeIdString = ToString(SelfId().NodeId());
215+
const auto guidString = TGUID::Create().AsGuidString();
216+
217+
Root_ /= (TStringBuilder() << "node_" << nodeIdString << "_" << guidString);
218+
Cerr << "Root from config: " << Config_.Root << ", Root_: " << Root_ << "\n";
210219

211220
LOG_I("Init DQ local file spilling service at " << Root_ << ", actor: " << SelfId());
212221

222+
{
223+
Cerr << "Traverse:\n";
224+
TDirIterator iter(root, TDirIterator::TOptions().SetMaxLevel(1));
225+
TVector<TString> old_tmps;
226+
for (const auto &dir_entry : iter) {
227+
if (dir_entry.fts_info == FTS_DP) {
228+
// skip postorder visit
229+
continue;
230+
}
231+
TString dir_name = dir_entry.fts_name;
232+
TVector<TString> parts;
233+
StringSplitter(dir_name).Split('_').Collect(&parts);
234+
235+
if (parts.size() == 3 && parts[0] == "node" && parts[1] == nodeIdString && parts[2] != guidString) {
236+
Cerr << "Found old temporary at '" << (root / dir_name) << "'\n";
237+
old_tmps.emplace_back(std::move(dir_name));
238+
}
239+
}
240+
241+
ForEach(old_tmps.begin(), old_tmps.end(), [&root](const auto& dir_name) {
242+
(root / dir_name).ForceDelete();
243+
});
244+
}
245+
213246
try {
214247
if (Root_.IsSymlink()) {
215248
throw TIoException() << Root_ << " is a symlink, can not start Spilling Service";

ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ class TLocalServiceHolder {
8080
TActorSetupCmd(resman, TMailboxType::Simple, 0));
8181

8282
if (withSpilling) {
83-
char tempDir[MAX_PATH];
84-
if (MakeTempDir(tempDir, nullptr) != 0)
85-
ythrow yexception() << "LocalServiceHolder: Can't create temporary directory " << tempDir;
83+
const char * tempDir = "/tmp/spilling-service-tmp";
8684

87-
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = true}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
85+
MakeDirIfNotExist(tempDir);
86+
87+
auto spillingActor = NDq::CreateDqLocalFileSpillingService(NDq::TFileSpillingServiceConfig{.Root = tempDir, .CleanupOnShutdown = false}, MakeIntrusive<NDq::TSpillingCounters>(lwmGroup));
8888

8989
ServiceNode->AddLocalService(
9090
NDq::MakeDqLocalFileSpillingServiceID(nodeId),

0 commit comments

Comments
 (0)