@@ -162,6 +162,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
162
162
EvCloseFileResponse = TEvDqSpillingLocalFile::EEv::LastEvent + 1 ,
163
163
EvWriteFileResponse,
164
164
EvReadFileResponse,
165
+ EvRemoveOldTmp,
165
166
166
167
LastEvent
167
168
};
@@ -192,6 +193,15 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
192
193
bool Removed = false ;
193
194
TMaybe<TString> Error;
194
195
};
196
+
197
+ struct TEvRemoveOldTmp : public TEventLocal <TEvRemoveOldTmp, EvRemoveOldTmp> {
198
+ TFsPath TmpRoot;
199
+ ui32 NodeId;
200
+ TString GuidString;
201
+
202
+ TEvRemoveOldTmp (TFsPath tmpRoot, ui32 nodeId, TString guidString)
203
+ : TmpRoot(std::move(tmpRoot)), NodeId(nodeId), GuidString(std::move(guidString)) {}
204
+ };
195
205
};
196
206
197
207
struct TFileDesc ;
@@ -209,40 +219,16 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
209
219
210
220
void Bootstrap () {
211
221
Root_ = Config_.Root ;
212
- const auto root = Root_;
213
-
214
- const auto nodeIdString = ToString (SelfId ().NodeId ());
222
+ const auto nodeId = SelfId ().NodeId ();
215
223
const auto guidString = TGUID::Create ().AsGuidString ();
216
224
217
- Root_ /= (TStringBuilder () << " node_" << nodeIdString << " _" << guidString);
225
+ Send (SelfId (), MakeHolder<TEvPrivate::TEvRemoveOldTmp>(Root_, nodeId, guidString));
226
+
227
+ Root_ /= (TStringBuilder () << " node_" << nodeId << " _" << guidString);
218
228
Cerr << " Root from config: " << Config_.Root << " , Root_: " << Root_ << " \n " ;
219
229
220
230
LOG_I (" Init DQ local file spilling service at " << Root_ << " , actor: " << SelfId ());
221
231
222
- {
223
- Cerr << " Traverse:\n " ;
224
- TDirIterator iter (root, TDirIterator::TOptions ().SetMaxLevel (1 ));
225
- TVector<TString> old_tmps;
226
- for (const auto &dir_entry : iter) {
227
- if (dir_entry.fts_info == FTS_DP) {
228
- // skip postorder visit
229
- continue ;
230
- }
231
- TString dir_name = dir_entry.fts_name ;
232
- TVector<TString> parts;
233
- StringSplitter (dir_name).Split (' _' ).Collect (&parts);
234
-
235
- if (parts.size () == 3 && parts[0 ] == " node" && parts[1 ] == nodeIdString && parts[2 ] != guidString) {
236
- Cerr << " Found old temporary at '" << (root / dir_name) << " '\n " ;
237
- old_tmps.emplace_back (std::move (dir_name));
238
- }
239
- }
240
-
241
- ForEach (old_tmps.begin (), old_tmps.end (), [&root](const auto & dir_name) {
242
- (root / dir_name).ForceDelete ();
243
- });
244
- }
245
-
246
232
try {
247
233
if (Root_.IsSymlink ()) {
248
234
throw TIoException () << Root_ << " is a symlink, can not start Spilling Service" ;
@@ -304,6 +290,7 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
304
290
hFunc(TEvPrivate::TEvWriteFileResponse, HandleWork)
305
291
hFunc(TEvDqSpilling::TEvRead, HandleWork)
306
292
hFunc(TEvPrivate::TEvReadFileResponse, HandleWork)
293
+ hFunc(TEvPrivate::TEvRemoveOldTmp, HandleWork)
307
294
hFunc(NMon::TEvHttpInfo, HandleWork)
308
295
cFunc(TEvents::TEvPoison::EventType, PassAway)
309
296
);
@@ -745,6 +732,37 @@ class TDqLocalFileSpillingService : public TActorBootstrapped<TDqLocalFileSpilli
745
732
Send (ev->Sender , new NMon::TEvHttpInfoRes (s.Str ()));
746
733
}
747
734
735
+ void HandleWork (TEvPrivate::TEvRemoveOldTmp::TPtr& ev) {
736
+ auto & msg = *ev->Get ();
737
+ auto & root = msg.TmpRoot ;
738
+ auto nodeIdString = ToString (msg.NodeId );
739
+ auto & guidString = msg.GuidString ;
740
+
741
+ LOG_I (" [RemoveOldTmp] removing at root: " << root);
742
+
743
+ TDirIterator iter (root, TDirIterator::TOptions ().SetMaxLevel (1 ));
744
+
745
+ TVector<TString> oldTmps;
746
+ for (const auto &dirEntry : iter) {
747
+ if (dirEntry.fts_info == FTS_DP) {
748
+ // skip postorder visit
749
+ continue ;
750
+ }
751
+ TString dirName = dirEntry.fts_name ;
752
+ TVector<TString> parts;
753
+ StringSplitter (dirName).Split (' _' ).Collect (&parts);
754
+
755
+ if (parts.size () == 3 && parts[0 ] == " node" && parts[1 ] == nodeIdString && parts[2 ] != guidString) {
756
+ LOG_D (" [RemoveOldTmp] found old temporary at " << (root / dirName));
757
+ oldTmps.emplace_back (std::move (dirName));
758
+ }
759
+ }
760
+
761
+ ForEach (oldTmps.begin (), oldTmps.end (), [&root](const auto & dirName) {
762
+ (root / dirName).ForceDelete ();
763
+ });
764
+ }
765
+
748
766
private:
749
767
void RunOp (TStringBuf opName, THolder<IObjectInQueue> op, TFileDesc& fd) {
750
768
if (fd.HasActiveOp ) {
0 commit comments