Skip to content

Commit

Permalink
Add opendal fs implement
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaoew1991 committed Nov 14, 2023
1 parent 28ec02a commit 14c7b22
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 22 deletions.
15 changes: 11 additions & 4 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ option(WITH_ASAN "Build with address sanitizer." OFF)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
include(libopendal)

find_package(Boost REQUIRED)
find_package(Arrow REQUIRED)
find_package(protobuf REQUIRED)
find_package(glog REQUIRED)
find_package(AWSSDK REQUIRED)

file(GLOB_RECURSE SRC_FILES src/*.cpp src/*.cc)
message(STATUS "SRC_FILES: ${SRC_FILES}")
add_library(milvus-storage ${SRC_FILES})
target_include_directories(milvus-storage PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include/milvus-storage ${CMAKE_CURRENT_SOURCE_DIR}/src)
target_link_libraries(milvus-storage PUBLIC arrow::libarrow arrow::libparquet Boost::boost protobuf::protobuf aws-sdk-cpp::aws-sdk-cpp)
target_link_libraries(milvus-storage PUBLIC glog::glog)
target_link_libraries(milvus-storage PUBLIC
arrow::libarrow
arrow::libparquet
Boost::boost
protobuf::protobuf
glog::glog
opendal
)

if (WITH_UT)
enable_testing()
Expand Down
55 changes: 55 additions & 0 deletions cpp/cmake/libopendal.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
function(build_opendal)
include(ExternalProject)
set(OPENDAL_NAME "libopendal_c${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(OPENDAL_PREFIX ${CMAKE_BINARY_DIR}/thirdparty/opendal_ep)

file(MAKE_DIRECTORY
"${OPENDAL_PREFIX}"
"${OPENDAL_PREFIX}/lib"
"${OPENDAL_PREFIX}/include"
)

if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(OPENDAL_BUILD_TYPE "debug")
else()
set(OPENDAL_BUILD_TYPE "release")
endif()

ExternalProject_Add(
opendal_ep
GIT_REPOSITORY https://github.com/apache/incubator-opendal.git
GIT_TAG main
PREFIX ${OPENDAL_PREFIX}
SOURCE_SUBDIR bindings/c
CONFIGURE_COMMAND echo "configure for opendal_ep"
BUILD_COMMAND cargo build --${OPENDAL_BUILD_TYPE}
BUILD_IN_SOURCE 1
INSTALL_COMMAND bash -c "cp ${OPENDAL_PREFIX}/src/opendal_ep/target/${OPENDAL_BUILD_TYPE}/${OPENDAL_NAME} ${OPENDAL_PREFIX}/lib/ && cp ${OPENDAL_PREFIX}/src/opendal_ep/bindings/c/include/opendal.h ${OPENDAL_PREFIX}/include/")


add_library(opendal STATIC IMPORTED)
set_target_properties(opendal
PROPERTIES
IMPORTED_GLOBAL TRUE
IMPORTED_LOCATION "${OPENDAL_PREFIX}/lib/${OPENDAL_NAME}"
INTERFACE_INCLUDE_DIRECTORIES "${OPENDAL_PREFIX}/include")
add_dependencies(opendal opendal_ep)
if(APPLE)
target_link_libraries(opendal INTERFACE "-framework CoreFoundation")
target_link_libraries(opendal INTERFACE "-framework Security")
target_link_libraries(opendal INTERFACE "-framework SystemConfiguration")
endif()

get_target_property(OPENDAL_IMPORTED_LOCATION opendal IMPORTED_LOCATION)
get_target_property(OPENDAL_INTERFACE_INCLUDE_DIRECTORIES opendal INTERFACE_INCLUDE_DIRECTORIES)
message("OPENDAL_IMPORTED_LOCATION: ${OPENDAL_IMPORTED_LOCATION}")
message("OPENDAL_INTERFACE_INCLUDE_DIRECTORIES: ${OPENDAL_INTERFACE_INCLUDE_DIRECTORIES}")
endfunction()

if (opendal_FOUND)
message("Found opendal: ${opendal_INCLUDE_DIRS}")
else()
build_opendal()
endif()


8 changes: 4 additions & 4 deletions cpp/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class StorageConan(ConanFile):
"with_asan": False,
"with_profiler": False,
"with_ut": True,
"arrow:with_s3": True,
"aws-sdk-cpp:config": True,
"aws-sdk-cpp:text-to-speech": False,
"aws-sdk-cpp:transfer": False,
# "arrow:with_s3": True,
# "aws-sdk-cpp:config": True,
# "aws-sdk-cpp:text-to-speech": False,
# "aws-sdk-cpp:transfer": False,
"arrow:filesystem_layer": True,
"arrow:dataset_modules": True,
"arrow:parquet": True,
Expand Down
65 changes: 65 additions & 0 deletions cpp/include/milvus-storage/common/opendal_fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <arrow/filesystem/filesystem.h>
#include <arrow/util/macros.h>
#include <arrow/util/uri.h>
#include "opendal.h"

namespace milvus_storage {

class OpendalOptions {
public:
static arrow::Result<OpendalOptions> FromUri(const arrow::internal::Uri& uri, std::string* out_path);

const std::unordered_map<std::string, std::string>& options() const { return options_; }

const std::string& at(const std::string& key) const { return options_.at(key); }

protected:
std::unordered_map<std::string, std::string> options_;
};

class OpendalFileSystem : public arrow::fs::FileSystem {
public:
~OpendalFileSystem() override;

std::string type_name() const override { return "opendal"; }

bool Equals(const FileSystem& other) const override;

arrow::Result<arrow::fs::FileInfo> GetFileInfo(const std::string& path) override;
arrow::Result<std::vector<arrow::fs::FileInfo>> GetFileInfo(const arrow::fs::FileSelector& select) override;
arrow::fs::FileInfoGenerator GetFileInfoGenerator(const arrow::fs::FileSelector& select) override {
throw std::runtime_error("Not implemented");
};

arrow::Status CreateDir(const std::string& path, bool recursive = true) override;

arrow::Status DeleteDir(const std::string& path) override;
arrow::Status DeleteDirContents(const std::string& path, bool missing_dir_ok = false) override;
arrow::Status DeleteRootDirContents() override { throw std::runtime_error("Not implemented"); }

arrow::Status DeleteFile(const std::string& path) override;

arrow::Status Move(const std::string& src, const std::string& dest) override;

arrow::Status CopyFile(const std::string& src, const std::string& dest) override;

arrow::Result<std::shared_ptr<arrow::io::InputStream>> OpenInputStream(const std::string& path) override;
arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> OpenInputFile(const std::string& path) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata = {}) override;

arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenAppendStream(
const std::string& path, const std::shared_ptr<const arrow::KeyValueMetadata>& metadata = {}) override;

/// Create a S3FileSystem instance from the given options.
static arrow::Result<std::shared_ptr<OpendalFileSystem>> Make(
const OpendalOptions& options, const arrow::io::IOContext& = arrow::io::default_io_context());

protected:
OpendalFileSystem(const OpendalOptions& options, const arrow::io::IOContext& io_context);
opendal_operator* operator_;
OpendalOptions options_;
};

} // namespace milvus_storage
36 changes: 22 additions & 14 deletions cpp/src/common/fs_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <cstdlib>
#include "common/log.h"
#include "common/macro.h"
#include "common/opendal_fs.h"

namespace milvus_storage {

Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string& uri, std::string* out_path) {
Expand All @@ -26,22 +28,28 @@ Result<std::shared_ptr<arrow::fs::FileSystem>> BuildFileSystem(const std::string
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }

if (schema == "s3") {
if (!arrow::fs::IsS3Initialized()) {
RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized());
std::atexit([]() {
auto status = arrow::fs::EnsureS3Finalized();
if (!status.ok()) {
LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message();
}
});
}
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option));

if (schema == "opendal") {
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, OpendalOptions::FromUri(uri_parser, out_path));
ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, OpendalFileSystem::Make(option));
return std::shared_ptr<arrow::fs::FileSystem>(fs);
}

// if (schema == "s3") {
// if (!arrow::fs::IsS3Initialized()) {
// RETURN_ARROW_NOT_OK(arrow::fs::EnsureS3Initialized());
// std::atexit([]() {
// auto status = arrow::fs::EnsureS3Finalized();
// if (!status.ok()) {
// LOG_STORAGE_WARNING_ << "Failed to finalize S3: " << status.message();
// }
// });
// }
// ASSIGN_OR_RETURN_ARROW_NOT_OK(auto option, arrow::fs::S3Options::FromUri(uri_parser, out_path));
// ASSIGN_OR_RETURN_ARROW_NOT_OK(auto fs, arrow::fs::S3FileSystem::Make(option));
//
// return std::shared_ptr<arrow::fs::FileSystem>(fs);
// }
//
return Status::InvalidArgument("Unsupported schema: " + schema);
}
/**
Expand All @@ -57,4 +65,4 @@ std::string UriToPath(const std::string& uri) {
return std::string("");
}
}
}; // namespace milvus_storage
}; // namespace milvus_storage
Loading

0 comments on commit 14c7b22

Please sign in to comment.