Skip to content

Various fixes to the DAOS tensorflow-io plugin. #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 87 additions & 56 deletions tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
#include "tensorflow_io/core/filesystems/dfs/dfs_utils.h"

#include <stdio.h>
#undef NDEBUG
#include <cassert>

namespace tensorflow {
namespace io {
namespace dfs {


// SECTION 1. Implementation for `TF_RandomAccessFile`
// ----------------------------------------------------------------------------
namespace tf_random_access_file {
typedef struct DFSRandomAccessFile {
std::string dfs_path;
dfs_t* daos_fs;
DAOS_FILE daos_file;
dfs_obj_t *daos_file;
std::vector<ReadBuffer> buffers;
daos_size_t file_size;
daos_handle_t mEventQueueHandle{};

DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
: dfs_path(std::move(dfs_path)) {
daos_fs = file_system;
daos_file.file = obj;
daos_file = obj;
dfs_get_size(daos_fs, obj, &file_size);
size_t num_of_buffers;
size_t buff_size;
int rc = daos_eq_create(&mEventQueueHandle);
assert(rc == 0);

if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
num_of_buffers = atoi(env_num_of_buffers);
Expand All @@ -42,12 +49,12 @@ typedef struct DFSRandomAccessFile {

void Cleanup(TF_RandomAccessFile* file) {
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
for (auto& buffer : dfs_file->buffers) {
buffer.FinalizeEvent();
}
dfs_file->buffers.clear();

daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
dfs_release(dfs_file->daos_file.file);
int rc = daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
assert(rc == 0);
rc = dfs_release(dfs_file->daos_file);
assert(rc == 0);
dfs_file->daos_fs = nullptr;
delete dfs_file;
}
Expand All @@ -65,14 +72,19 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
int64_t total_bytes = 0;
size_t ret_size = offset + n;
while (curr_offset < ret_size && curr_offset < dfs_file->file_size) {
size_t read_bytes = 0;
int64_t read_bytes = 0;
for (auto& read_buf : dfs_file->buffers) {
if (read_buf.CacheHit(curr_offset)) {
read_bytes = read_buf.CopyFromCache(ret, ret_offset, curr_offset, n,
dfs_file->file_size, status);
break;
}
}

if (read_bytes < 0) {
return -1;
}

if (read_bytes > 0) {
curr_offset += read_bytes;
ret_offset += read_bytes;
Expand All @@ -81,30 +93,13 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
continue;
}

size_t async_offset = curr_offset + BUFF_SIZE;
for (size_t i = 1; i < dfs_file->buffers.size(); i++) {
size_t async_offset = curr_offset;
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
if (async_offset > dfs_file->file_size) break;
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs,
dfs_file->daos_file.file, async_offset);
dfs_file->daos_file, async_offset);
async_offset += BUFF_SIZE;
}

dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file,
curr_offset);

read_bytes = dfs_file->buffers[0].CopyFromCache(
ret, ret_offset, curr_offset, n, dfs_file->file_size, status);

curr_offset += read_bytes;
ret_offset += read_bytes;
total_bytes += read_bytes;
n -= read_bytes;

if (curr_offset >= dfs_file->file_size) {
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
dfs_file->buffers[i].WaitEvent();
}
}
}

return total_bytes;
Expand All @@ -118,17 +113,42 @@ namespace tf_writable_file {
typedef struct DFSWritableFile {
std::string dfs_path;
dfs_t* daos_fs;
DAOS_FILE daos_file;
dfs_obj_t *daos_file;
daos_size_t file_size;
bool size_known;

DFSWritableFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
: dfs_path(std::move(dfs_path)) {
daos_fs = file_system;
daos_file.file = obj;
daos_file = obj;
size_known=false;
}

int get_file_size(daos_size_t &size) {
if (!size_known) {
int rc = dfs_get_size(daos_fs, daos_file, &file_size);
if (rc != 0) {
return rc;
}
size_known = true;
}
size = file_size;
return 0;
}

void set_file_size(daos_size_t size) {
file_size = size;
size_known = true;
}

void unset_file_size(void) {
size_known = false;
}
} DFSWritableFile;

void Cleanup(TF_WritableFile* file) {
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
dfs_release(dfs_file->daos_file.file);
dfs_release(dfs_file->daos_file);
dfs_file->daos_fs = nullptr;
delete dfs_file;
}
Expand All @@ -144,32 +164,44 @@ void Append(const TF_WritableFile* file, const char* buffer, size_t n,
wsgl.sg_nr = 1;
wsgl.sg_iovs = &iov;

daos_size_t size;
dfs_get_size(dfs_file->daos_fs, dfs_file->daos_file.file, &size);
dfs_file->daos_file.offset = size;
daos_size_t cur_file_size;
rc = dfs_file->get_file_size(cur_file_size);
if (rc != 0) {
TF_SetStatus(status, TF_INTERNAL, "Cannot determine file size");
return;
}

rc = dfs_write(dfs_file->daos_fs, dfs_file->daos_file.file, &wsgl,
dfs_file->daos_file.offset, NULL);
rc = dfs_write(dfs_file->daos_fs, dfs_file->daos_file, &wsgl,
cur_file_size, NULL);
if (rc) {
TF_SetStatus(status, TF_RESOURCE_EXHAUSTED, "");
dfs_file->unset_file_size();
return;
}

dfs_file->set_file_size(cur_file_size + n);
TF_SetStatus(status, TF_OK, "");
}

int64_t Tell(const TF_WritableFile* file, TF_Status* status) {
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);

TF_SetStatus(status, TF_OK, "");
daos_size_t cur_file_size;
int rc = dfs_file->get_file_size(cur_file_size);
if (rc != 0) {
TF_SetStatus(status, TF_INTERNAL, "Cannot determine file size");
return -1;
}

return dfs_file->daos_file.offset;
TF_SetStatus(status, TF_OK, "");
return cur_file_size;
}

void Close(const TF_WritableFile* file, TF_Status* status) {
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
dfs_release(dfs_file->daos_file.file);
dfs_release(dfs_file->daos_file);
dfs_file->daos_fs = nullptr;
dfs_file->daos_file.file = nullptr;
dfs_file->daos_file = nullptr;
TF_SetStatus(status, TF_OK, "");
}

Expand Down Expand Up @@ -206,7 +238,7 @@ void NewFile(const TF_Filesystem* filesystem, const char* path, File_Mode mode,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, file_path;
Expand All @@ -222,7 +254,7 @@ void NewWritableFile(const TF_Filesystem* filesystem, const char* path,
if (TF_GetCode(status) != TF_OK) return;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
file->plugin_file =
Expand All @@ -237,13 +269,13 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
if (TF_GetCode(status) != TF_OK) return;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
auto random_access_file =
new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, obj);
random_access_file->buffers[0].ReadAsync(
daos->daos_fs, random_access_file->daos_file.file, 0);
daos->daos_fs, random_access_file->daos_file, 0);
file->plugin_file = random_access_file;
TF_SetStatus(status, TF_OK, "");
}
Expand All @@ -255,7 +287,7 @@ void NewAppendableFile(const TF_Filesystem* filesystem, const char* path,
if (TF_GetCode(status) != TF_OK) return;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
file->plugin_file =
Expand All @@ -268,7 +300,7 @@ void PathExists(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, file;
Expand All @@ -288,7 +320,7 @@ void CreateDir(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, dir_path;
Expand All @@ -304,7 +336,7 @@ static void RecursivelyCreateDir(const TF_Filesystem* filesystem,
std::string pool, cont, dir_path;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
rc = daos->Setup(path, pool, cont, dir_path, status);
Expand Down Expand Up @@ -333,13 +365,13 @@ void DeleteFileSystemEntry(const TF_Filesystem* filesystem, const char* path,
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();

if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}

rc = daos->Setup(path, pool, cont, dir_path, status);
if (rc) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}

Expand Down Expand Up @@ -376,7 +408,7 @@ bool IsDir(const TF_Filesystem* filesystem, const char* path,
std::string pool, cont, file;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return is_dir;
}
rc = daos->Setup(path, pool, cont, file, status);
Expand Down Expand Up @@ -411,7 +443,7 @@ int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return -1;
}
std::string pool, cont, file;
Expand Down Expand Up @@ -448,7 +480,7 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
int allow_cont_creation = 1;
Expand All @@ -473,7 +505,6 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,

daos->Connect(pool_src, cont_src, allow_cont_creation, status);
if (TF_GetCode(status) != TF_OK) {
TF_SetStatus(status, TF_NOT_FOUND, "");
return;
}

Expand Down Expand Up @@ -558,7 +589,7 @@ void Stat(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, dir_path;
Expand Down Expand Up @@ -598,7 +629,7 @@ int GetChildren(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return -1;
}
std::string pool, cont, dir_path;
Expand Down
Loading