Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IsDirectory() to Env and FS #6711

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

### Public API Change
* Add NewFileChecksumGenCrc32cFactory to the file checksum public API, such that the builtin Crc32c based file checksum generator factory can be used by applications.
* Add IsDirectory to Env and FS to indicate if a path is a directory.

### New Features
* Added support for pipelined & parallel compression optimization for `BlockBasedTableBuilder`. This optimization makes block building, block compression and block appending a pipeline, and uses multiple threads to accelerate block compression. Users can set `CompressionOptions::parallel_threads` greater than 1 to enable compression parallelism.
Expand Down
10 changes: 10 additions & 0 deletions env/composite_env_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ class CompositeEnvWrapper : public Env {
return file_system_->NewLogger(fname, io_opts, result, &dbg);
}

Status IsDirectory(const std::string& path, bool* is_dir) override {
IOOptions io_opts;
IODebugContext dbg;
return file_system_->IsDirectory(path, io_opts, is_dir, &dbg);
}

#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
Expand Down Expand Up @@ -1081,6 +1087,10 @@ class LegacyFileSystemWrapper : public FileSystem {
uint64_t* diskfree, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->GetFreeSpace(path, diskfree));
}
IOStatus IsDirectory(const std::string& path, const IOOptions& /*options*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
return status_to_io_status(target_->IsDirectory(path, is_dir));
}

private:
Env* target_;
Expand Down
12 changes: 12 additions & 0 deletions env/env_hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,18 @@ Status HdfsEnv::NewLogger(const std::string& fname,
return Status::OK();
}

Status HdfsEnv::IsDirectory(const std::string& path, bool* is_dir) {
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, path.c_str());
if (pFileInfo != nullptr) {
if (is_dir != nullptr) {
*is_dir = (pFileInfo->mKind == tObjectKindDirectory);
}
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(path, errno);
}

// The factory method for creating an HDFS Env
Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
*hdfs_env = new HdfsEnv(fsname);
Expand Down
33 changes: 32 additions & 1 deletion env/env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1937,7 +1937,13 @@ class TestEnv : public EnvWrapper {
int close_count;
};

class EnvTest : public testing::Test {};
class EnvTest : public testing::Test {
public:
EnvTest() : test_directory_(test::PerThreadDBPath("env_test")) {}

protected:
const std::string test_directory_;
};

TEST_F(EnvTest, Close) {
TestEnv* env = new TestEnv();
Expand Down Expand Up @@ -2090,6 +2096,31 @@ TEST_F(EnvTest, MultipleCompositeEnv) {
ASSERT_EQ(env2->GetBackgroundThreads(Env::HIGH), 8);
}

TEST_F(EnvTest, IsDirectory) {
Status s = Env::Default()->CreateDirIfMissing(test_directory_);
ASSERT_OK(s);
const std::string test_sub_dir = test_directory_ + "sub1";
const std::string test_file_path = test_directory_ + "file1";
ASSERT_OK(Env::Default()->CreateDirIfMissing(test_sub_dir));
bool is_dir = false;
ASSERT_OK(Env::Default()->IsDirectory(test_sub_dir, &is_dir));
ASSERT_TRUE(is_dir);
{
std::unique_ptr<FSWritableFile> wfile;
s = Env::Default()->GetFileSystem()->NewWritableFile(
test_file_path, FileOptions(), &wfile, /*dbg=*/nullptr);
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> fwriter;
fwriter.reset(new WritableFileWriter(std::move(wfile), test_file_path,
FileOptions(), Env::Default()));
constexpr char buf[] = "test";
s = fwriter->Append(buf);
ASSERT_OK(s);
}
ASSERT_OK(Env::Default()->IsDirectory(test_file_path, &is_dir));
ASSERT_FALSE(is_dir);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
22 changes: 22 additions & 0 deletions env/fs_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,28 @@ class PosixFileSystem : public FileSystem {
return IOStatus::OK();
}

IOStatus IsDirectory(const std::string& path, const IOOptions& /*opts*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
// First open
int fd = -1;
int flags = cloexec_flags(O_RDONLY, nullptr);
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(path.c_str(), flags);
}
if (fd < 0) {
return IOError("While open for IsDirectory()", path, errno);
}
struct stat sbuf;
if (fstat(fd, &sbuf) < 0) {
return IOError("While doing stat for IsDirectory()", path, errno);
}
if (nullptr != is_dir) {
*is_dir = S_ISDIR(sbuf.st_mode);
}
return IOStatus::OK();
}

FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override {
FileOptions optimized = file_options;
Expand Down
6 changes: 6 additions & 0 deletions hdfs/env_hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class HdfsEnv : public Env {
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;

Status IsDirectory(const std::string& path, bool* is_dir) override;

void Schedule(void (*function)(void* arg), void* arg, Priority pri = LOW,
void* tag = nullptr,
void (*unschedFunction)(void* arg) = 0) override {
Expand Down Expand Up @@ -329,6 +331,10 @@ class HdfsEnv : public Env {
return notsup;
}

Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) override {
return notsup;
}

virtual void Schedule(void (* /*function*/)(void* arg), void* /*arg*/,
Priority /*pri*/ = LOW, void* /*tag*/ = nullptr,
void (* /*unschedFunction*/)(void* arg) = 0) override {}
Expand Down
31 changes: 22 additions & 9 deletions include/rocksdb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class Env {
virtual Status ReopenWritableFile(const std::string& /*fname*/,
std::unique_ptr<WritableFile>* /*result*/,
const EnvOptions& /*options*/) {
return Status::NotSupported();
return Status::NotSupported("Env::ReopenWritableFile() not supported.");
}

// Reuse an existing file by renaming it and opening it as writable.
Expand Down Expand Up @@ -461,7 +461,7 @@ class Env {
virtual int GetBackgroundThreads(Priority pri = LOW) = 0;

virtual Status SetAllowNonOwnerAccess(bool /*allow_non_owner_access*/) {
return Status::NotSupported("Not supported.");
return Status::NotSupported("Env::SetAllowNonOwnerAccess() not supported.");
}

// Enlarge number of background worker threads of a specific thread pool
Expand Down Expand Up @@ -518,7 +518,7 @@ class Env {

// Returns the status of all threads that belong to the current Env.
virtual Status GetThreadList(std::vector<ThreadStatus>* /*thread_list*/) {
return Status::NotSupported("Not supported.");
return Status::NotSupported("Env::GetThreadList() not supported.");
}

// Returns the pointer to ThreadStatusUpdater. This function will be
Expand All @@ -537,7 +537,12 @@ class Env {
// Get the amount of free disk space
virtual Status GetFreeSpace(const std::string& /*path*/,
uint64_t* /*diskfree*/) {
return Status::NotSupported();
return Status::NotSupported("Env::GetFreeSpace() not supported.");
}

// Check whether the specified path is a directory
virtual Status IsDirectory(const std::string& /*path*/, bool* /*is_dir*/) {
return Status::NotSupported("Env::IsDirectory() not supported.");
}

virtual void SanitizeEnvOptions(EnvOptions* /*env_opts*/) const {}
Expand Down Expand Up @@ -599,14 +604,16 @@ class SequentialFile {
// of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop.
virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::NotSupported("InvalidateCache not supported.");
return Status::NotSupported(
"SequentialFile::InvalidateCache not supported.");
}

// Positioned Read for direct I/O
// If Direct I/O enabled, offset, n, and scratch should be properly aligned
virtual Status PositionedRead(uint64_t /*offset*/, size_t /*n*/,
Slice* /*result*/, char* /*scratch*/) {
return Status::NotSupported();
return Status::NotSupported(
"SequentialFile::PositionedRead() not supported.");
}

// If you're adding methods here, remember to add them to
Expand Down Expand Up @@ -709,7 +716,8 @@ class RandomAccessFile {
// of this file. If the length is 0, then it refers to the end of file.
// If the system is not caching the file contents, then this is a noop.
virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::NotSupported("InvalidateCache not supported.");
return Status::NotSupported(
"RandomAccessFile::InvalidateCache not supported.");
}

// If you're adding methods here, remember to add them to
Expand Down Expand Up @@ -767,7 +775,8 @@ class WritableFile {
// required is queried via GetRequiredBufferAlignment()
virtual Status PositionedAppend(const Slice& /* data */,
uint64_t /* offset */) {
return Status::NotSupported();
return Status::NotSupported(
"WritableFile::PositionedAppend() not supported.");
}

// Truncate is necessary to trim the file to the correct size
Expand Down Expand Up @@ -842,7 +851,7 @@ class WritableFile {
// If the system is not caching the file contents, then this is a noop.
// This call has no effect on dirty pages in the cache.
virtual Status InvalidateCache(size_t /*offset*/, size_t /*length*/) {
return Status::NotSupported("InvalidateCache not supported.");
return Status::NotSupported("WritableFile::InvalidateCache not supported.");
}

// Sync a file range with disk.
Expand Down Expand Up @@ -1279,6 +1288,10 @@ class EnvWrapper : public Env {

Status UnlockFile(FileLock* l) override { return target_->UnlockFile(l); }

Status IsDirectory(const std::string& path, bool* is_dir) override {
return target_->IsDirectory(path, is_dir);
}

Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
std::shared_ptr<DynamicLibrary>* result) override {
Expand Down
8 changes: 8 additions & 0 deletions include/rocksdb/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,10 @@ class FileSystem {
return IOStatus::NotSupported();
}

virtual IOStatus IsDirectory(const std::string& /*path*/,
const IOOptions& options, bool* is_dir,
IODebugContext* /*dgb*/) = 0;

// If you're adding methods here, remember to add them to EnvWrapper too.

private:
Expand Down Expand Up @@ -1193,6 +1197,10 @@ class FileSystemWrapper : public FileSystem {
uint64_t* diskfree, IODebugContext* dbg) override {
return target_->GetFreeSpace(path, options, diskfree, dbg);
}
IOStatus IsDirectory(const std::string& path, const IOOptions& options,
bool* is_dir, IODebugContext* dbg) override {
return target_->IsDirectory(path, options, is_dir, dbg);
}

private:
std::shared_ptr<FileSystem> target_;
Expand Down
12 changes: 12 additions & 0 deletions port/win/env_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,14 @@ Status WinEnvIO::NewLogger(const std::string& fname,
return s;
}

Status WinEnvIO::IsDirectory(const std::string& path, bool* is_dir) {
BOOL ret = RX_PathIsDirectory(RX_FN(path).c_str());
if (is_dir) {
*is_dir = ret ? true : false;
}
return Status::OK();
}

uint64_t WinEnvIO::NowMicros() {

if (GetSystemTimePreciseAsFileTime_ != NULL) {
Expand Down Expand Up @@ -1433,6 +1441,10 @@ Status WinEnv::NewLogger(const std::string& fname,
return winenv_io_.NewLogger(fname, result);
}

Status WinEnv::IsDirectory(const std::string& path, bool* is_dir) {
return winenv_io_.IsDirectory(path, is_dir);
}

uint64_t WinEnv::NowMicros() {
return winenv_io_.NowMicros();
}
Expand Down
4 changes: 4 additions & 0 deletions port/win/env_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class WinEnvIO {
virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result);

virtual Status IsDirectory(const std::string& path, bool* is_dir);

virtual uint64_t NowMicros();

virtual uint64_t NowNanos();
Expand Down Expand Up @@ -287,6 +289,8 @@ class WinEnv : public Env {
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;

Status IsDirectory(const std::string& path, bool* is_dir) override;

uint64_t NowMicros() override;

uint64_t NowNanos() override;
Expand Down
2 changes: 2 additions & 0 deletions port/win/port_win.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
#define RX_PathIsRelative PathIsRelativeW
#define RX_GetCurrentDirectory GetCurrentDirectoryW
#define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExW
#define RX_PathIsDirectory PathIsDirectoryW

#else

Expand All @@ -389,6 +390,7 @@ extern void SetCpuPriority(ThreadId id, CpuPriority priority);
#define RX_PathIsRelative PathIsRelativeA
#define RX_GetCurrentDirectory GetCurrentDirectoryA
#define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExA
#define RX_PathIsDirectory PathIsDirectoryA

#endif

Expand Down
8 changes: 4 additions & 4 deletions test_util/fault_injection_test_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
int frames;

explicit ErrorContext(uint32_t seed)
: rand(seed),
enable_error_injection(false),
callstack(nullptr),
frames(0) {}
: rand(seed),
enable_error_injection(false),
callstack(nullptr),
frames(0) {}
~ErrorContext() {
if (callstack) {
free(callstack);
Expand Down