13
13
#include < util/folder/path.h>
14
14
#include < util/stream/file.h>
15
15
#include < util/thread/pool.h>
16
+ #include < util/generic/guid.h>
17
+ #include < util/folder/iterator.h>
18
+ #include < util/generic/vector.h>
19
+ #include < util/folder/dirut.h>
20
+ #include < util/system/user.h>
16
21
17
22
namespace NYql ::NDq {
18
23
@@ -159,6 +164,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
159
164
EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1 ,
160
165
EvWriteFileResponse,
161
166
EvReadFileResponse,
167
+ EvRemoveOldTmp,
162
168
163
169
LastEvent
164
170
};
@@ -189,6 +195,15 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
189
195
bool Removed = false ;
190
196
TMaybe<TString> Error;
191
197
};
198
+
199
+ struct TEvRemoveOldTmp : public TEventLocal <TEvRemoveOldTmp, EvRemoveOldTmp> {
200
+ TFsPath TmpRoot;
201
+ ui32 NodeId;
202
+ TString GuidString;
203
+
204
+ TEvRemoveOldTmp (TFsPath tmpRoot, ui32 nodeId, TString guidString)
205
+ : TmpRoot(std::move(tmpRoot)), NodeId(nodeId), GuidString(std::move(guidString)) {}
206
+ };
192
207
};
193
208
194
209
struct TFileDesc ;
@@ -206,8 +221,13 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
206
221
207
222
void Bootstrap () {
208
223
Root_ = Config_.Root ;
209
- Root_ /= (TStringBuilder () << " node_" << SelfId ().NodeId ());
224
+ const auto nodeId = SelfId ().NodeId ();
225
+ const auto guidString = TGUID::Create ().AsGuidString ();
210
226
227
+ Send (SelfId (), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(Root_, nodeId, guidString));
228
+
229
+ Root_ /= (TStringBuilder () << " node_" << nodeId << " _" << guidString);
230
+ Cerr << " Root from config: " << Config_.Root << " , actual root: " << Root_ << " \n " ;
211
231
LOG_I (" Init DQ local file spilling service at " << Root_ << " , actor: " << SelfId ());
212
232
213
233
try {
@@ -271,6 +291,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
271
291
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
272
292
hFunc(TEvDqSpilling::TEvRead, HandleWork)
273
293
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
294
+ hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork)
274
295
hFunc(NMon::TEvHttpInfo, HandleWork)
275
296
cFunc(TEvents::TEvPoison::EventType, PassAway)
276
297
);
@@ -712,6 +733,41 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
712
733
Send (ev->Sender , new NMon::TEvHttpInfoRes (s.Str ()));
713
734
}
714
735
736
+ void HandleWork (TEvPrivate::TEvRemoveOldTmp::TPtr& ev) {
737
+ auto & msg = *ev->Get ();
738
+ auto & root = msg.TmpRoot ;
739
+ auto nodeIdString = ToString (msg.NodeId );
740
+ auto & guidString = msg.GuidString ;
741
+
742
+ LOG_I (" [RemoveOldTmp] removing at root: " << root);
743
+
744
+ try {
745
+ TDirIterator iter (root, TDirIterator::TOptions ().SetMaxLevel (1 ));
746
+
747
+ TVector<TString> oldTmps;
748
+ for (const auto &dirEntry : iter) {
749
+ if (dirEntry.fts_info == FTS_DP) {
750
+ // skip postorder visit
751
+ continue ;
752
+ }
753
+ TString dirName = dirEntry.fts_name ;
754
+ TVector<TString> parts;
755
+ StringSplitter (dirName).Split (' _' ).Collect (&parts);
756
+
757
+ if (parts.size () == 3 && parts[0 ] == " node" && parts[1 ] == nodeIdString && parts[2 ] != guidString) {
758
+ LOG_D (" [RemoveOldTmp] found old temporary at " << (root / dirName));
759
+ oldTmps.emplace_back (std::move (dirName));
760
+ }
761
+ }
762
+
763
+ ForEach (oldTmps.begin (), oldTmps.end (), [&root](const auto & dirName) {
764
+ (root / dirName).ForceDelete ();
765
+ });
766
+ } catch (const yexception& e) {
767
+ LOG_E (" [RemoveOldTmp] removing failed due to: " << e.what ());
768
+ }
769
+ }
770
+
715
771
private:
716
772
void RunOp (TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
717
773
if (fd.HasActiveOp ) {
@@ -952,6 +1008,13 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
952
1008
953
1009
} // anonymous namespace
954
1010
1011
+ TFsPath GetTmpSpillingRootForCurrentUser () {
1012
+ auto root = TFsPath{GetSystemTempDir ()};
1013
+ root /= " spilling-tmp-" + GetUsername ();
1014
+ MakeDirIfNotExist (root);
1015
+ return root;
1016
+ }
1017
+
955
1018
IActor* CreateDqLocalFileSpillingActor (TTxId txId, const TString& details, const TActorId& client,
956
1019
bool removeBlobsAfterRead)
957
1020
{
0 commit comments