Skip to content

Commit

Permalink
Rework "file" kvstore write paths for better network file support.
Browse files Browse the repository at this point in the history
* When writing, stat before rename.
  Stat after rename was done to reduce spurious failures on some windows
  paths; we may need to watch this case more closely.
* Remove fallback stat-by-filenames;
  these should not be necessary when doing operations in the new order.
* Merge RAII FileLock class with WriteLockHelper class.
* Add verbose logging for Lock, Unlock, and Delete.

I ran this on a WSL instance, and it appears to resolve the wsl network filesystem issues in:
  #160 (comment)

PiperOrigin-RevId: 665436157
Change-Id: Ia8048936a636f181e53edd247c3847280a0d66ca
  • Loading branch information
laramiel authored and copybara-github committed Aug 20, 2024
1 parent d5edeac commit 2a44dc7
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 196 deletions.
2 changes: 1 addition & 1 deletion tensorstore/internal/os/file_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ using FileDescriptor = HANDLE; // HANDLE
/// File descriptor traits for use with `UniqueHandle`.
struct FileDescriptorTraits {
static FileDescriptor Invalid() { return ((FileDescriptor)-1); }
static void Close(FileDescriptor handle);
static void Close(FileDescriptor fd);
};

/// Representation of file metadata.
Expand Down
45 changes: 30 additions & 15 deletions tensorstore/internal/os/file_util_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ void UnlockFcntlLock(FileDescriptor fd) {
// This is not strictly necessary as the posix/linux locks will be released
// when the fd is closed, but it allows easier reasoning by making locking
// behave similarly across platforms.
TS_DETAIL_LOG_BEGIN << " fd=" << fd;
while (true) {
struct ::flock lock;
lock.l_type = F_UNLCK;
Expand All @@ -119,15 +120,15 @@ void UnlockFcntlLock(FileDescriptor fd) {
}
}
if (errno == EINTR) continue;
ABSL_LOG_FIRST_N(INFO, 1)
<< StatusFromOsError(errno, "Failed to release lock");
TS_DETAIL_LOG_ERROR << " fd=" << fd;
return;
}
ABSL_UNREACHABLE();
}
#endif

void UnlockFlockLock(FileDescriptor fd) {
TS_DETAIL_LOG_BEGIN << " fd=" << fd;
while (true) {
{
PotentiallyBlockingRegion region;
Expand All @@ -136,8 +137,7 @@ void UnlockFlockLock(FileDescriptor fd) {
}
}
if (errno == EINTR) continue;
ABSL_LOG_FIRST_N(INFO, 1)
<< StatusFromOsError(errno, "Failed to release lock");
TS_DETAIL_LOG_ERROR << " fd=" << fd;
return;
}
ABSL_UNREACHABLE();
Expand All @@ -146,6 +146,7 @@ void UnlockFlockLock(FileDescriptor fd) {
} // namespace

Result<UnlockFn> AcquireFdLock(FileDescriptor fd) {
TS_DETAIL_LOG_BEGIN << " fd=" << fd;
#if defined(F_OFD_SETLKW)
while (true) {
// This blocks until the lock is acquired (SETLKW). If any signal is
Expand All @@ -165,21 +166,25 @@ Result<UnlockFn> AcquireFdLock(FileDescriptor fd) {
{
PotentiallyBlockingRegion region;
if (::fcntl(fd, F_OFD_SETLKW, &lock) != -1) {
TS_DETAIL_LOG_END << " fd=" << fd;
return UnlockFcntlLock;
}
}
if (errno == EINTR) continue;
if (errno == EINVAL || errno == ENOTSUP) break;
TS_DETAIL_LOG_ERROR << " fd=" << fd;
return StatusFromOsError(errno, "Failed to lock file");
}
#endif
while (true) {
{
PotentiallyBlockingRegion region;
if (::flock(fd, LOCK_EX) != -1) {
TS_DETAIL_LOG_END << " fd=" << fd;
return UnlockFlockLock;
}
if (errno == EINTR) continue;
TS_DETAIL_LOG_ERROR << " fd=" << fd;
return StatusFromOsError(errno, "Failed to lock file");
}
}
Expand All @@ -188,23 +193,22 @@ Result<UnlockFn> AcquireFdLock(FileDescriptor fd) {

Result<UniqueFileDescriptor> OpenExistingFileForReading(
const std::string& path) {
TS_DETAIL_LOG_BEGIN << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
FileDescriptor fd;
{
PotentiallyBlockingRegion region;
fd = ::open(path.c_str(), O_RDONLY | O_CLOEXEC);
}
if (fd == FileDescriptorTraits::Invalid()) {
TS_DETAIL_LOG_ERROR << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(errno, "Failed to open: ", QuoteString(path));
}
TS_DETAIL_LOG_END << " path=" << tensorstore::QuoteString(path)
<< ", fd=" << fd;
TS_DETAIL_LOG_END << " path=" << QuoteString(path) << ", fd=" << fd;
return UniqueFileDescriptor(fd);
}

Result<UniqueFileDescriptor> OpenFileForWriting(const std::string& path) {
TS_DETAIL_LOG_BEGIN << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
FileDescriptor fd = FileDescriptorTraits::Invalid();
const auto attempt_open = [&] {
PotentiallyBlockingRegion region;
Expand All @@ -227,11 +231,10 @@ Result<UniqueFileDescriptor> OpenFileForWriting(const std::string& path) {
}
#endif
if (fd == FileDescriptorTraits::Invalid()) {
TS_DETAIL_LOG_ERROR << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(errno, "Failed to create: ", QuoteString(path));
}
TS_DETAIL_LOG_END << " path=" << tensorstore::QuoteString(path)
<< ", fd=" << fd;
TS_DETAIL_LOG_END << " path=" << QuoteString(path) << ", fd=" << fd;
return UniqueFileDescriptor(fd);
}

Expand Down Expand Up @@ -306,27 +309,39 @@ absl::Status TruncateFile(FileDescriptor fd) {

absl::Status RenameOpenFile(FileDescriptor fd, const std::string& old_name,
const std::string& new_name) {
TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", old_name=" << QuoteString(old_name)
<< ", new_name=" << QuoteString(new_name);
PotentiallyBlockingRegion region;
if (::rename(old_name.c_str(), new_name.c_str()) == 0) {
TS_DETAIL_LOG_END << " fd=" << fd << ", old_name=" << QuoteString(old_name)
<< ", new_name=" << QuoteString(new_name);
return absl::OkStatus();
}
TS_DETAIL_LOG_ERROR << " fd=" << fd << ", old_name=" << QuoteString(old_name)
<< ", new_name=" << QuoteString(new_name);
return StatusFromOsError(errno, "Failed to rename: ", QuoteString(old_name),
" to: ", QuoteString(new_name));
}

absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) {
TS_DETAIL_LOG_BEGIN << " fd=" << fd << ", path=" << QuoteString(path);
PotentiallyBlockingRegion region;
if (::unlink(path.c_str()) == 0) {
TS_DETAIL_LOG_END << " fd=" << fd;
return absl::OkStatus();
}
TS_DETAIL_LOG_ERROR << " fd=" << fd;
return StatusFromOsError(errno, "Failed to delete: ", QuoteString(path));
}

absl::Status DeleteFile(const std::string& path) {
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
PotentiallyBlockingRegion region;
if (::unlink(path.c_str()) == 0) {
TS_DETAIL_LOG_END << " path=" << QuoteString(path);
return absl::OkStatus();
}
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(errno, "Failed to delete: ", QuoteString(path));
}

Expand All @@ -353,13 +368,13 @@ absl::Status GetFileInfo(FileDescriptor fd, FileInfo* info) {
}

absl::Status GetFileInfo(const std::string& path, FileInfo* info) {
TS_DETAIL_LOG_BEGIN << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
PotentiallyBlockingRegion region;
if (::stat(path.c_str(), info) == 0) {
TS_DETAIL_LOG_END << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_END << " path=" << QuoteString(path);
return absl::OkStatus();
}
TS_DETAIL_LOG_ERROR << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(errno);
}

Expand Down
43 changes: 28 additions & 15 deletions tensorstore/internal/os/file_util_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,36 +173,41 @@ Result<DWORD> GetFileAttributes(const std::wstring& filename) {
#endif

void UnlockWin32Lock(FileDescriptor fd) {
TS_DETAIL_LOG_BEGIN << " handle=" << fd;
auto lock_offset = GetLockOverlapped();
// Ignore any errors.
::UnlockFileEx(fd, /*dwReserved=*/0, /*nNumberOfBytesToUnlockLow=*/1,
/*nNumberOfBytesToUnlockHigh=*/0,
/*lpOverlapped=*/&lock_offset);
TS_DETAIL_LOG_END << " handle=" << fd;
}

} // namespace

void FileDescriptorTraits::Close(FileDescriptor handle) {
TS_DETAIL_LOG_BEGIN << " handle=" << handle;
::CloseHandle(handle);
TS_DETAIL_LOG_END << " handle=" << handle;
void FileDescriptorTraits::Close(FileDescriptor fd) {
TS_DETAIL_LOG_BEGIN << " handle=" << fd;
::CloseHandle(fd);
TS_DETAIL_LOG_END << " handle=" << fd;
}

Result<UnlockFn> AcquireFdLock(FileDescriptor fd) {
TS_DETAIL_LOG_BEGIN << " handle=" << fd;
auto lock_offset = GetLockOverlapped();
if (::LockFileEx(fd, /*dwFlags=*/LOCKFILE_EXCLUSIVE_LOCK,
/*dwReserved=*/0,
/*nNumberOfBytesToLockLow=*/1,
/*nNumberOfBytesToLockHigh=*/0,
/*lpOverlapped=*/&lock_offset)) {
TS_DETAIL_LOG_END << " handle=" << fd;
return UnlockWin32Lock;
}
TS_DETAIL_LOG_ERROR << " handle=" << fd;
return StatusFromOsError(::GetLastError(), "Failed to lock file");
}

Result<UniqueFileDescriptor> OpenExistingFileForReading(
const std::string& path) {
TS_DETAIL_LOG_BEGIN << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
std::wstring wpath;
TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(path, wpath));

Expand All @@ -215,17 +220,16 @@ Result<UniqueFileDescriptor> OpenExistingFileForReading(
/*hTemplateFile=*/nullptr);

if (fd == FileDescriptorTraits::Invalid()) {
TS_DETAIL_LOG_ERROR << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(::GetLastError(),
"Failed to open: ", QuoteString(path));
}
TS_DETAIL_LOG_END << " path=" << tensorstore::QuoteString(path)
<< ", handle=" << fd;
TS_DETAIL_LOG_END << " path=" << QuoteString(path) << ", handle=" << fd;
return UniqueFileDescriptor(fd);
}

Result<UniqueFileDescriptor> OpenFileForWriting(const std::string& path) {
TS_DETAIL_LOG_BEGIN << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
std::wstring wpath;
TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(path, wpath));

Expand All @@ -242,12 +246,11 @@ Result<UniqueFileDescriptor> OpenFileForWriting(const std::string& path) {
/*hTemplateFile=*/nullptr);

if (fd == FileDescriptorTraits::Invalid()) {
TS_DETAIL_LOG_ERROR << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(::GetLastError(),
"Failed to create: ", QuoteString(path));
}
TS_DETAIL_LOG_END << " path=" << tensorstore::QuoteString(path)
<< ", handle=" << fd;
TS_DETAIL_LOG_END << " path=" << QuoteString(path) << ", handle=" << fd;
return UniqueFileDescriptor(fd);
}

Expand Down Expand Up @@ -310,11 +313,15 @@ absl::Status TruncateFile(FileDescriptor fd) {

absl::Status RenameOpenFile(FileDescriptor fd, const std::string& old_name,
const std::string& new_name) {
TS_DETAIL_LOG_BEGIN << " handle=" << fd
<< ", old_name=" << QuoteString(old_name)
<< ", new_name=" << QuoteString(new_name);
std::wstring wpath_new;
TENSORSTORE_RETURN_IF_ERROR(ConvertUTF8ToWindowsWide(new_name, wpath_new));

// Try using Posix semantics.
if (RenameFilePosix(fd, wpath_new)) {
TS_DETAIL_LOG_END << " handle=" << fd;
return absl::OkStatus();
}

Expand All @@ -330,9 +337,11 @@ absl::Status RenameOpenFile(FileDescriptor fd, const std::string& old_name,
// Try using MoveFileEx, which may not be atomic.
if (::MoveFileExW(wpath_old.c_str(), wpath_new.c_str(),
MOVEFILE_REPLACE_EXISTING | MOVEFILE_WRITE_THROUGH)) {
TS_DETAIL_LOG_END << " handle=" << fd;
return absl::OkStatus();
}
TS_DETAIL_LOG_ERROR << " handle=" << fd;
return StatusFromOsError(::GetLastError(),
"Failed to rename: ", QuoteString(old_name),
" to: ", QuoteString(new_name));
Expand All @@ -347,6 +356,7 @@ absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) {
// result in the normal read/write paths failing with an error. To avoid
// that problem, we first rename the file to a random name, with a suffix of
// `kLockSuffix` to prevent it from being included in List results.
TS_DETAIL_LOG_BEGIN << " handle=" << fd << ", path=" << QuoteString(path);
unsigned int buf[5];
for (int i = 0; i < 5; ++i) {
::rand_s(&buf[i]);
Expand Down Expand Up @@ -374,6 +384,7 @@ absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) {
}
// Attempt to delete the open handle using posix semantics?
if (DeleteFilePosix(fd)) {
TS_DETAIL_LOG_END << " handle=" << fd;
return absl::OkStatus();
}
#ifndef NDEBUG
Expand All @@ -383,8 +394,10 @@ absl::Status DeleteOpenFile(FileDescriptor fd, const std::string& path) {
#endif
// The file has been renamed, so delete the renamed file.
if (::DeleteFileW(wpath_temp.c_str())) {
TS_DETAIL_LOG_END << " handle=" << fd;
return absl::OkStatus();
}
TS_DETAIL_LOG_ERROR << " handle=" << fd;
return StatusFromOsError(::GetLastError(),
"Failed to delete: ", QuoteString(path));
}
Expand Down Expand Up @@ -431,7 +444,7 @@ absl::Status GetFileInfo(FileDescriptor fd, FileInfo* info) {
}
absl::Status GetFileInfo(const std::string& path, FileInfo* info) {
TS_DETAIL_LOG_BEGIN << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_BEGIN << " path=" << QuoteString(path);
// The typedef uses BY_HANDLE_FILE_INFO, which includes device and index
// metadata, and requires an open handle.
Expand All @@ -446,11 +459,11 @@ absl::Status GetFileInfo(const std::string& path, FileInfo* info) {
/*hTemplateFile=*/nullptr));
if (stat_fd.valid()) {
if (::GetFileInformationByHandle(stat_fd.get(), info)) {
TS_DETAIL_LOG_END << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_END << " path=" << QuoteString(path);
return absl::OkStatus();
}
}
TS_DETAIL_LOG_ERROR << " path=" << tensorstore::QuoteString(path);
TS_DETAIL_LOG_ERROR << " path=" << QuoteString(path);
return StatusFromOsError(::GetLastError());
}
Expand Down
2 changes: 1 addition & 1 deletion tensorstore/kvstore/file/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ tensorstore_cc_library(
"//tensorstore/util:result",
"//tensorstore/util:span",
"//tensorstore/util:status",
"//tensorstore/util:str_cat",
"//tensorstore/util/execution",
"//tensorstore/util/garbage_collection",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/functional:function_ref",
"@com_google_absl//absl/log:absl_check",
"@com_google_absl//absl/log:absl_log",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings",
Expand Down
Loading

0 comments on commit 2a44dc7

Please sign in to comment.