Skip to content

Commit

Permalink
FSI cut-over: enable compressed S3 import
Browse files Browse the repository at this point in the history
* Implement `ForeignDataImporter::importGeneralS3` to offer full S3
capabilities to all supported data wrappers that can import the
corresponding file format locally.
  • Loading branch information
mattgara authored and andrewseidl committed Mar 17, 2022
1 parent 85a6c99 commit 414b192
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 31 deletions.
12 changes: 10 additions & 2 deletions Archive/S3Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ void S3Archive::init_for_read() {
// land entirely to be imported... (avro?)
const std::string S3Archive::land(const std::string& objkey,
std::exception_ptr& teptr,
const bool for_detection) {
const bool for_detection,
const bool allow_named_pipe_use,
const bool track_file_paths) {
// 7z file needs entire landing; other file types use a named pipe
static std::atomic<int64_t> seqno(((int64_t)getpid() << 32) | time(0));
// need a dummy ext b/c no-ext now indicate plain_text
Expand All @@ -163,6 +165,9 @@ const std::string S3Archive::land(const std::string& objkey,
#ifdef ENABLE_IMPORT_PARQUET
use_pipe = use_pipe && (nullptr == ext || 0 != strcmp(ext, ".parquet"));
#endif
if (!allow_named_pipe_use) { // override using a named pipe no matter the configuration
use_pipe = false;
}
if (use_pipe) {
if (mkfifo(file_path.c_str(), 0660) < 0) {
throw std::runtime_error("failed to create named pipe '" + file_path +
Expand Down Expand Up @@ -269,7 +274,10 @@ const std::string S3Archive::land(const std::string& objkey,
}
}

file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
if (track_file_paths) { // `file_paths` may be shared between threads, so is not
// thread-safe
file_paths.insert(std::pair<const std::string, const std::string>(objkey, file_path));
}
return file_path;
}

Expand Down
19 changes: 13 additions & 6 deletions Archive/S3Archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class S3Archive : public Archive {
const bool plain_text,
const std::optional<std::string>& regex_path_filter,
const std::optional<std::string>& file_sort_order_by,
const std::optional<std::string>& file_sort_regex)
const std::optional<std::string>& file_sort_regex,
const std::string& s3_temp_dir_path = {})
: S3Archive(url, plain_text) {
this->s3_access_key = s3_access_key;
this->s3_secret_key = s3_secret_key;
Expand All @@ -82,10 +83,14 @@ class S3Archive : public Archive {
this->file_sort_order_by = file_sort_order_by;
this->file_sort_regex = file_sort_regex;

// this must be local to heavydb not client
// or posix dir path accessible to heavydb
auto env_s3_temp_dir = getenv("TMPDIR");
s3_temp_dir = env_s3_temp_dir ? env_s3_temp_dir : "/tmp";
if (s3_temp_dir_path.empty()) {
// this must be local to heavydb not client
// or posix dir path accessible to heavydb
auto env_s3_temp_dir = getenv("TMPDIR");
s3_temp_dir = env_s3_temp_dir ? env_s3_temp_dir : "/tmp";
} else {
s3_temp_dir = s3_temp_dir_path;
}
}

~S3Archive() override {
Expand All @@ -109,7 +114,9 @@ class S3Archive : public Archive {
#ifdef HAVE_AWS_S3
const std::string land(const std::string& objkey,
std::exception_ptr& teptr,
const bool for_detection);
const bool for_detection,
const bool allow_named_pipe_use = true,
const bool track_file_paths = true);
void vacuum(const std::string& objkey);
#else
const std::string land(const std::string& objkey,
Expand Down
6 changes: 4 additions & 2 deletions DataMgr/ForeignStorage/ForeignDataWrapperFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
#include "Shared/SysDefinitions.h"
#include "Shared/misc.h"

namespace {

namespace foreign_storage {
bool is_s3_uri(const std::string& file_path) {
const std::string s3_prefix = "s3://";
return file_path.find(s3_prefix) != std::string::npos;
}
} // namespace foreign_storage

namespace {


bool is_valid_data_wrapper(const std::string& data_wrapper_type) {
Expand Down
2 changes: 2 additions & 0 deletions DataMgr/ForeignStorage/ForeignDataWrapperFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
namespace foreign_storage {
class UserMapping;

bool is_s3_uri(const std::string& file_path);

/**
* Verify if `source_type` is valid.
*/
Expand Down
14 changes: 14 additions & 0 deletions DataMgr/ForeignStorage/FsiChunkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ auto partition_for_threads(const std::vector<T>& items, size_t max_threads) {
return items_by_thread;
}

template <typename Container>
std::vector<std::future<void>> create_futures_for_workers(
const Container& items,
size_t max_threads,
std::function<void(const Container&)> lambda) {
auto items_per_thread = partition_for_threads(items, max_threads);
std::vector<std::future<void>> futures;
for (const auto& items : items_per_thread) {
futures.emplace_back(std::async(std::launch::async, lambda, items));
}

return futures;
}

const foreign_storage::ForeignTable& get_foreign_table_for_key(const ChunkKey& key);

bool is_system_table_chunk_key(const ChunkKey& chunk_key);
Expand Down
22 changes: 8 additions & 14 deletions DataMgr/ForeignStorage/ParquetDataWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,26 +534,20 @@ void ParquetDataWrapper::populateChunkBuffers(const ChunkToBufferMap& required_b
chunk_key[CHUNK_KEY_FRAGMENT_IDX]);
}

auto hints_per_thread = partition_for_threads(col_frag_hints, g_max_import_threads);

std::vector<std::future<void>> futures;
for (const auto& hint_set : hints_per_thread) {
futures.emplace_back(std::async(std::launch::async, [&, hint_set, this] {
for (const auto& [col_id, frag_id] : hint_set) {
loadBuffersUsingLazyParquetChunkLoader(
col_id, frag_id, buffers_to_load, delete_buffer);
}
}));
}
std::function<void(const std::set<ForeignStorageMgr::ParallelismHint>&)> lambda =
[&, this](const std::set<ForeignStorageMgr::ParallelismHint>& hint_set) {
for (const auto& [col_id, frag_id] : hint_set) {
loadBuffersUsingLazyParquetChunkLoader(
col_id, frag_id, buffers_to_load, delete_buffer);
}
};
auto futures = create_futures_for_workers(col_frag_hints, g_max_import_threads, lambda);

// We wait on all futures, then call get because we want all threads to have finished
// before we propagate a potential exception.
for (auto& future : futures) {
future.wait();
}
for (auto& future : futures) {
future.wait();
}

for (auto& future : futures) {
future.get();
Expand Down
2 changes: 2 additions & 0 deletions ImportExport/CopyParams.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ struct CopyParams {
std::string s3_session_token = "";
std::string s3_region;
std::string s3_endpoint;
int32_t s3_max_concurrent_downloads =
8; // maximum number of concurrent file downloads from S3
// kafka related params
size_t retry_count;
size_t retry_wait;
Expand Down
Loading

0 comments on commit 414b192

Please sign in to comment.