Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sync_dir interface to Env #2884

Merged
merged 1 commit into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add sync_dir interface to Env
when we need to ensure that **a newly-created file** is fully
synchronized back to disk, we should call `fsync()` on the parent
directory—that is, the directory containing the newly-created file.
That is to say, In this situation, we should call `fsync()` on
both the newly-created file and its parent directory.

Unfortunately, currently in Doris, in any scenario, directories
are not fsynced.

This patch adds `sync_dir()` interface first, laying the goundwork
for future fixes.

This patch also remove unneeded private method `dir_exists()`.
  • Loading branch information
lingbin committed Feb 11, 2020
commit da7bf2364fac43327449b195614095d733811ce9
6 changes: 3 additions & 3 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ class Status {
}

bool ok() const { return _state == nullptr; }

bool is_cancelled() const { return code() == TStatusCode::CANCELLED; }
bool is_mem_limit_exceeded() const { return code() == TStatusCode::MEM_LIMIT_EXCEEDED; }
bool is_thrift_rpc_error() const { return code() == TStatusCode::THRIFT_RPC_ERROR; }

bool is_end_of_file() const { return code() == TStatusCode::END_OF_FILE; }

bool is_not_found() const { return code() == TStatusCode::NOT_FOUND; }

bool is_already_exist() const { return code() == TStatusCode::ALREADY_EXIST; }
bool is_io_error() const {return code() == TStatusCode::IO_ERROR; }

// Convert into TStatus. Call this if 'status_container' contains an optional
// TStatus field named 'status'. This also sets __isset.status.
template <typename T>
Expand Down
39 changes: 23 additions & 16 deletions be/src/env/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class Env {
virtual Status new_writable_file(const std::string& fname,
std::unique_ptr<WritableFile>* result) = 0;


// Like the previous new_writable_file, but allows options to be
// specified.
virtual Status new_writable_file(const WritableFileOptions& opts,
Expand Down Expand Up @@ -139,9 +138,21 @@ class Env {
// Delete the named file.
virtual Status delete_file(const std::string& fname) = 0;

// Create the specified directory. Returns error if directory exists.
// Create the specified directory.
// NOTE: It will return error if the path already exist(not necessarily as a directory)
virtual Status create_dir(const std::string& dirname) = 0;

// Creates directory if missing.
// Return OK if it exists, or successful in Creating.
virtual Status create_dir_if_missing(const std::string& dirname, bool* created = nullptr) = 0;

// Delete the specified directory.
// NOTE: The dir must be empty.
virtual Status delete_dir(const std::string& dirname) = 0;

// Synchronize the entry for a specific directory.
virtual Status sync_dir(const std::string& dirname) = 0;

// Checks if the file is a directory. Returns an error if it doesn't
// exist, otherwise writes true or false into 'is_dir' appropriately.
virtual Status is_directory(const std::string& path, bool* is_dir) = 0;
Expand All @@ -154,13 +165,6 @@ class Env {
// All directory entries in 'path' must exist on the filesystem.
virtual Status canonicalize(const std::string& path, std::string* result) = 0;

// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
virtual Status create_dir_if_missing(const std::string& dirname) = 0;

// Delete the specified directory.
virtual Status delete_dir(const std::string& dirname) = 0;

virtual Status get_file_size(const std::string& fname, uint64_t* size) = 0;

// Store the last modification time of fname in *file_mtime.
Expand All @@ -170,11 +174,9 @@ class Env {
virtual Status rename_file(const std::string& src,
const std::string& target) = 0;

// Hard Link file src to target.
virtual Status link_file(const std::string& /*src*/,
const std::string& /*target*/) {
return Status::NotSupported("link file is not supported for this Env");
}
// create a hard-link
virtual Status link_file(const std::string& /*old_path*/,
const std::string& /*new_path*/) = 0;
};

struct RandomAccessFileOptions {
Expand Down Expand Up @@ -265,6 +267,8 @@ class RandomAccessFile {
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
// Note: To avoid user misuse, WritableFile's API should support only
// one of Append or PositionedAppend. We support only Append here.
class WritableFile {
public:
enum FlushMode {
Expand All @@ -276,10 +280,13 @@ class WritableFile {
virtual ~WritableFile() { }

// Append data to the end of the file
// Note: A WritableFile object must support either Append or
// PositionedAppend, so the users cannot mix the two.
virtual Status append(const Slice& data) = 0;

// If possible, uses scatter-gather I/O to efficiently append
// multiple buffers to a file. Otherwise, falls back to regular I/O.
//
// For implementation specific quirks and details, see comments in
// implementation source code (e.g., env_posix.cc)
virtual Status appendv(const Slice* data, size_t cnt) = 0;

// Pre-allocates 'size' bytes for the file in the underlying filesystem.
Expand Down
109 changes: 67 additions & 42 deletions be/src/env/env_posix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ namespace doris {
using std::string;
using strings::Substitute;

// Close file descriptor when object goes out of scope.
class ScopedFdCloser {
public:
explicit ScopedFdCloser(int fd) : fd_(fd) {}

~ScopedFdCloser() {
int err;
RETRY_ON_EINTR(err, ::close(fd_));
if (PREDICT_FALSE(err != 0)) {
LOG(WARNING) << "Failed to close fd " << fd_;
}
}

private:
const int fd_;
};

static Status io_error(const std::string& context, int err_number) {
switch (err_number) {
case EACCES:
Expand All @@ -51,7 +68,7 @@ static Status io_error(const std::string& context, int err_number) {
return Status::IOError(context, err_number, errno_to_string(err_number));
}

Status do_sync(int fd, const string& filename) {
static Status do_sync(int fd, const string& filename) {
if (fdatasync(fd) < 0) {
return io_error(filename, errno);
}
Expand Down Expand Up @@ -595,14 +612,53 @@ class PosixEnv : public Env {
return Status::OK();
}

// Create the specified directory. Returns error if directory exists.
Status create_dir(const std::string& name) override {
if (mkdir(name.c_str(), 0755) != 0) {
return io_error(name, errno);
}
return Status::OK();
}

Status create_dir_if_missing(const string& dirname, bool* created = nullptr) override {
Status s = create_dir(dirname);
if (created != nullptr) {
*created = s.ok();
}

// Check that dirname is actually a directory.
if (s.is_already_exist()) {
bool is_dir = false;
RETURN_IF_ERROR(is_directory(dirname, &is_dir));
if (is_dir) {
return Status::OK();
} else {
return s.clone_and_append("path already exists but not a dir");
}
}
return s;
}

// Delete the specified directory.
Status delete_dir(const std::string& dirname) override {
if (rmdir(dirname.c_str()) != 0) {
return io_error(dirname, errno);
}
return Status::OK();
}

Status sync_dir(const string& dirname) override {
int dir_fd;
RETRY_ON_EINTR(dir_fd, open(dirname.c_str(), O_DIRECTORY|O_RDONLY));
if (dir_fd < 0) {
return io_error(dirname, errno);
}
ScopedFdCloser fd_closer(dir_fd);
if (fsync(dir_fd) != 0) {
return io_error(dirname, errno);
}
return Status::OK();
}

Status is_directory(const std::string& path, bool* is_dir) override {
struct stat path_stat;
if (stat(path.c_str(), &path_stat) != 0) {
Expand All @@ -615,6 +671,8 @@ class PosixEnv : public Env {
}

Status canonicalize(const std::string& path, std::string* result) override {
// NOTE: we must use free() to release the buffer retruned by realpath(),
// because the buffer is allocated by malloc(), see `man 3 realpath`.
std::unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr));
if (r == nullptr) {
return io_error(Substitute("Unable to canonicalize $0", path), errno);
Expand All @@ -623,29 +681,6 @@ class PosixEnv : public Env {
return Status::OK();
}

// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
Status create_dir_if_missing(const std::string& name) override {
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
return io_error(name, errno);
} else if (!dir_exists(name)) { // Check that name is actually a
// directory.
// Message is taken from mkdir
return Status::IOError(name + " exists but is not a directory");
}
}
return Status::OK();
}

// Delete the specified directory.
Status delete_dir(const std::string& dirname) override {
if (rmdir(dirname.c_str()) != 0) {
return io_error(dirname, errno);
}
return Status::OK();
}

Status get_file_size(const string& fname, uint64_t* size) override {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
Expand All @@ -656,8 +691,7 @@ class PosixEnv : public Env {
return Status::OK();
}

Status get_file_modified_time(const std::string& fname,
uint64_t* file_mtime) override {
Status get_file_modified_time(const std::string& fname, uint64_t* file_mtime) override {
struct stat s;
if (stat(fname.c_str(), &s) !=0) {
return io_error(fname, errno);
Expand All @@ -673,27 +707,18 @@ class PosixEnv : public Env {
return Status::OK();
}

Status link_file(const std::string& src, const std::string& target) override {
if (link(src.c_str(), target.c_str()) != 0) {
return io_error(src, errno);
Status link_file(const std::string& old_path, const std::string& new_path) override {
if (link(old_path.c_str(), new_path.c_str()) != 0) {
return io_error(old_path, errno);
}
return Status::OK();
}

private:
bool dir_exists(const std::string& dname) {
struct stat statbuf;
if (stat(dname.c_str(), &statbuf) == 0) {
return S_ISDIR(statbuf.st_mode);
}
return false; // stat() failed return false
}
};

// Default Posix Env
Env* Env::Default() {
static PosixEnv default_env;
return &default_env;
static PosixEnv default_env;
return &default_env;
}

}
} // end namespace doris