Skip to content

[yql] Refresh file storage state at fork (YQL-17461) #878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions ydb/library/yql/core/file_storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <util/system/thread.h>

#include <functional>
#include <atomic>

#if defined(_unix_)
#include <pthread.h>
Expand Down Expand Up @@ -135,7 +136,7 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
private:
void Reinit() {
for (auto& v : Registered) {
v.ResetRandom();
v.ResetAtFork();
}
}

Expand All @@ -153,6 +154,9 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
, IsTemp(storagePath.empty())
, MaxFiles(maxFiles)
, MaxSize(maxSize)
, CurrentFiles(0)
, CurrentSize(0)
, Dirty(false)
{
// TFsPath is not thread safe. It can initialize internal Split at any time. Force do it right now
StorageDir.PathSplit();
Expand All @@ -172,8 +176,8 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
TAtforkReinit::Get().Register(this);
YQL_LOG(INFO) << "FileStorage initialized in " << StorageDir.GetPath().Quote()
<< ", temporary dir: " << ProcessTempDir.GetPath().Quote()
<< ", files: " << CurrentFiles
<< ", total size: " << CurrentSize;
<< ", files: " << CurrentFiles.load()
<< ", total size: " << CurrentSize.load();
}

~TImpl() {
Expand Down Expand Up @@ -219,8 +223,8 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
SetCacheFilePermissionsNoThrow(hardlinkFile);

if (NFs::HardLink(hardlinkFile, storageFile)) {
AtomicIncrement(CurrentFiles);
AtomicAdd(CurrentSize, fileSize);
++CurrentFiles;
CurrentSize += fileSize;
}
// Ignore HardLink fail. Another process managed to download before us
TouchFile(storageFile.c_str());
Expand Down Expand Up @@ -281,10 +285,10 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
const i64 newFileSize = Max<i64>(0, GetFileLength(dstStorageFile.c_str()));

if (!prevFileExisted) {
AtomicIncrement(CurrentFiles);
++CurrentFiles;
}

AtomicAdd(CurrentSize, newFileSize - prevFileSize);
CurrentSize += newFileSize - prevFileSize;
}

bool RemoveFromStorage(const TString& existingStorageFileName) {
Expand All @@ -300,19 +304,19 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
const bool result = NFs::Remove(storageFile);

if (result || !storageFile.Exists()) {
AtomicDecrement(CurrentFiles);
AtomicAdd(CurrentSize, -prevFileSize);
++CurrentFiles;
CurrentSize -= prevFileSize;
}

return result;
}

ui64 GetOccupiedSize() const {
return AtomicGet(CurrentSize);
return CurrentSize.load();
}

size_t GetCount() const {
return AtomicGet(CurrentFiles);
return CurrentFiles.load();
}

TString GetTempName() {
Expand Down Expand Up @@ -365,15 +369,17 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
CurrentSize = actualSize;
}

bool NeedToCleanup() {
return static_cast<ui64>(AtomicGet(CurrentFiles)) > MaxFiles ||
static_cast<ui64>(AtomicGet(CurrentSize)) > MaxSize;
bool NeedToCleanup() const {
return Dirty.load()
|| static_cast<ui64>(CurrentFiles.load()) > MaxFiles
|| static_cast<ui64>(CurrentSize.load()) > MaxSize;
}

void Cleanup() {
if (!NeedToCleanup()) {
return;
}
Dirty.store(false);

with_lock (CleanupLock) {
TVector<TString> names;
Expand Down Expand Up @@ -422,15 +428,17 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
}
}

AtomicSet(CurrentFiles, actualFiles);
AtomicSet(CurrentSize, actualSize);
CurrentFiles.store(actualFiles);
CurrentSize.store(actualSize);
}
}

void ResetRandom() {
void ResetAtFork() {
with_lock(RndLock) {
Rnd.ResetSeed();
}
// Force cleanup on next file add, because other processes may change the state
Dirty.store(true);
}

private:
Expand All @@ -441,8 +449,9 @@ class TStorage::TImpl: public TIntrusiveListItem<TImpl> {
const bool IsTemp;
const ui64 MaxFiles;
const ui64 MaxSize;
TAtomic CurrentFiles = 0;
TAtomic CurrentSize = 0;
std::atomic<i64> CurrentFiles = 0;
std::atomic<i64> CurrentSize = 0;
std::atomic_bool Dirty;
TMutex RndLock;
TRandGuid Rnd;
};
Expand Down