Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 84 additions & 32 deletions cpp/src/graphar/arrow/chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
seek_id_(0),
schema_(nullptr),
chunk_table_(nullptr),
chunk_cache_(options.cache_capacity),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix,
Expand All @@ -194,6 +195,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
seek_id_(0),
schema_(nullptr),
chunk_table_(nullptr),
chunk_cache_(options.cache_capacity),
filter_options_(options) {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));

Expand All @@ -211,9 +213,8 @@ Status VertexPropertyArrowChunkReader::seek(IdType id) {
IdType pre_chunk_index = chunk_index_;
chunk_index_ = id / vertex_info_->GetChunkSize();
if (chunk_index_ != pre_chunk_index) {
// TODO(@acezen): use a cache to avoid reloading the same chunk, could use
// a LRU cache.
chunk_table_.reset();
auto* cached = chunk_cache_.Get(chunk_index_);
chunk_table_ = cached ? *cached : nullptr;
}
if (chunk_index_ >= chunk_num_) {
return Status::IndexError("Internal vertex id ", id, " is out of range [0,",
Expand Down Expand Up @@ -260,6 +261,7 @@ VertexPropertyArrowChunkReader::GetChunkV2() {
GAR_RETURN_NOT_OK(
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
}
chunk_cache_.Put(chunk_index_, chunk_table_);
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand Down Expand Up @@ -304,6 +306,7 @@ VertexPropertyArrowChunkReader::GetChunkV1() {
GAR_RETURN_NOT_OK(
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
}
chunk_cache_.Put(chunk_index_, chunk_table_);
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand Down Expand Up @@ -340,6 +343,7 @@ VertexPropertyArrowChunkReader::GetLabelChunk() {
// GAR_RETURN_NOT_OK(
// CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
// }
chunk_cache_.Put(chunk_index_, chunk_table_);
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand All @@ -352,17 +356,22 @@ Status VertexPropertyArrowChunkReader::next_chunk() {
vertex_info_->GetType(), " chunk num ", chunk_num_);
}
seek_id_ = chunk_index_ * vertex_info_->GetChunkSize();
chunk_table_.reset();
auto* cached = chunk_cache_.Get(chunk_index_);
chunk_table_ = cached ? *cached : nullptr;

return Status::OK();
}

void VertexPropertyArrowChunkReader::Filter(util::Filter filter) {
filter_options_.filter = filter;
chunk_table_ = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be initialized in this xxxFilter function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, After filter the cache is outdated.

chunk_cache_.Disable();
}

void VertexPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
filter_options_.columns = column_names;
chunk_table_ = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be initialized in this xxxSelect function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

chunk_cache_.Clear();
}

Result<std::shared_ptr<VertexPropertyArrowChunkReader>>
Expand Down Expand Up @@ -517,14 +526,15 @@ VertexPropertyArrowChunkReader::MakeForLabels(

AdjListArrowChunkReader::AdjListArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix)
const std::string& prefix, size_t cache_capacity)
: edge_info_(edge_info),
adj_list_type_(adj_list_type),
prefix_(prefix),
vertex_chunk_index_(0),
chunk_index_(0),
seek_offset_(0),
chunk_table_(nullptr),
chunk_cache_(cache_capacity),
chunk_num_(-1) /* -1 means uninitialized */ {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix,
Expand All @@ -544,6 +554,7 @@ AdjListArrowChunkReader::AdjListArrowChunkReader(
chunk_index_(other.chunk_index_),
seek_offset_(other.seek_offset_),
chunk_table_(nullptr),
chunk_cache_(other.chunk_cache_.capacity()),
vertex_chunk_num_(other.vertex_chunk_num_),
chunk_num_(other.chunk_num_),
base_dir_(other.base_dir_),
Expand Down Expand Up @@ -585,7 +596,9 @@ Status AdjListArrowChunkReader::seek_src(IdType id) {
// initialize or update chunk_num_
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}

if (adj_list_type_ == AdjListType::unordered_by_source) {
Expand Down Expand Up @@ -618,7 +631,9 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) {
// initialize or update chunk_num_
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}

if (adj_list_type_ == AdjListType::unordered_by_dest) {
Expand All @@ -636,7 +651,9 @@ Status AdjListArrowChunkReader::seek(IdType offset) {
IdType pre_chunk_index = chunk_index_;
chunk_index_ = offset / edge_info_->GetChunkSize();
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}
if (chunk_num_ < 0) {
// initialize chunk_num_
Expand Down Expand Up @@ -666,6 +683,8 @@ Result<std::shared_ptr<arrow::Table>> AdjListArrowChunkReader::GetChunk() {
std::string path = prefix_ + chunk_file_path;
auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_),
chunk_table_);
}
IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand All @@ -688,21 +707,26 @@ Status AdjListArrowChunkReader::next_chunk() {
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
return Status::OK();
}

Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index,
IdType chunk_index) {
bool changed = false;
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
vertex_chunk_index_ = vertex_chunk_index;
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
changed = true;
}
if (chunk_index_ != chunk_index) {
if (chunk_index_ != chunk_index || changed) {
chunk_index_ = chunk_index;
seek_offset_ = chunk_index * edge_info_->GetChunkSize();
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}
return Status::OK();
}
Expand All @@ -715,32 +739,35 @@ Result<IdType> AdjListArrowChunkReader::GetRowNumOfChunk() {
std::string path = prefix_ + chunk_file_path;
auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_),
chunk_table_);
}
return chunk_table_->num_rows();
}

Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix) {
const std::string& prefix, size_t cache_capacity) {
if (!edge_info->HasAdjacentListType(adj_list_type)) {
return Status::KeyError(
"The adjacent list type ", AdjListTypeToString(adj_list_type),
" doesn't exist in edge ", edge_info->GetEdgeType(), ".");
}
return std::make_shared<AdjListArrowChunkReader>(edge_info, adj_list_type,
prefix);
prefix, cache_capacity);
}

Result<std::shared_ptr<AdjListArrowChunkReader>> AdjListArrowChunkReader::Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
const std::string& edge_type, const std::string& dst_type,
AdjListType adj_list_type) {
AdjListType adj_list_type, size_t cache_capacity) {
auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
if (!edge_info) {
return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
dst_type, " doesn't exist.");
}
return Make(edge_info, adj_list_type, graph_info->GetPrefix());
return Make(edge_info, adj_list_type, graph_info->GetPrefix(),
cache_capacity);
}

Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {
Expand All @@ -752,13 +779,14 @@ Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() {

AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader(
const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type,
const std::string& prefix)
const std::string& prefix, size_t cache_capacity)
: edge_info_(std::move(edge_info)),
adj_list_type_(adj_list_type),
prefix_(prefix),
chunk_index_(0),
seek_id_(0),
chunk_table_(nullptr) {
chunk_table_(nullptr),
chunk_cache_(cache_capacity) {
std::string base_dir;
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
GAR_ASSIGN_OR_RAISE_ERROR(auto dir_path,
Expand All @@ -785,7 +813,8 @@ Status AdjListOffsetArrowChunkReader::seek(IdType id) {
IdType pre_chunk_index = chunk_index_;
chunk_index_ = id / vertex_chunk_size_;
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
auto* cached = chunk_cache_.Get(chunk_index_);
chunk_table_ = cached ? *cached : nullptr;
}
if (chunk_index_ >= vertex_chunk_num_) {
return Status::IndexError("Internal vertex id ", id, "is out of range [0,",
Expand All @@ -806,6 +835,7 @@ AdjListOffsetArrowChunkReader::GetChunk() {
std::string path = prefix_ + chunk_file_path;
auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType();
GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type));
chunk_cache_.Put(chunk_index_, chunk_table_);
}
IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_;
return chunk_table_->Slice(row_offset)->column(0)->chunk(0);
Expand All @@ -820,35 +850,38 @@ Status AdjListOffsetArrowChunkReader::next_chunk() {
AdjListTypeToString(adj_list_type_), ".");
}
seek_id_ = chunk_index_ * vertex_chunk_size_;
chunk_table_.reset();
auto* cached = chunk_cache_.Get(chunk_index_);
chunk_table_ = cached ? *cached : nullptr;

return Status::OK();
}

Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
AdjListOffsetArrowChunkReader::Make(const std::shared_ptr<EdgeInfo>& edge_info,
AdjListType adj_list_type,
const std::string& prefix) {
const std::string& prefix,
size_t cache_capacity) {
if (!edge_info->HasAdjacentListType(adj_list_type)) {
return Status::KeyError(
"The adjacent list type ", AdjListTypeToString(adj_list_type),
" doesn't exist in edge ", edge_info->GetEdgeType(), ".");
}
return std::make_shared<AdjListOffsetArrowChunkReader>(edge_info,
adj_list_type, prefix);
return std::make_shared<AdjListOffsetArrowChunkReader>(
edge_info, adj_list_type, prefix, cache_capacity);
}

Result<std::shared_ptr<AdjListOffsetArrowChunkReader>>
AdjListOffsetArrowChunkReader::Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
const std::string& edge_type, const std::string& dst_type,
AdjListType adj_list_type) {
AdjListType adj_list_type, size_t cache_capacity) {
auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
if (!edge_info) {
return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
dst_type, " doesn't exist.");
}
return Make(edge_info, adj_list_type, graph_info->GetPrefix());
return Make(edge_info, adj_list_type, graph_info->GetPrefix(),
cache_capacity);
}

AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
Expand All @@ -865,6 +898,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
seek_offset_(0),
schema_(nullptr),
chunk_table_(nullptr),
chunk_cache_(options.cache_capacity),
filter_options_(options),
chunk_num_(-1) /* -1 means uninitialized */ {
GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
Expand All @@ -890,6 +924,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
seek_offset_(other.seek_offset_),
schema_(other.schema_),
chunk_table_(nullptr),
chunk_cache_(other.chunk_cache_.capacity()),
filter_options_(other.filter_options_),
vertex_chunk_num_(other.vertex_chunk_num_),
chunk_num_(other.chunk_num_),
Expand Down Expand Up @@ -935,7 +970,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}

if (adj_list_type_ == AdjListType::unordered_by_source) {
Expand Down Expand Up @@ -967,7 +1004,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) {
if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) {
vertex_chunk_index_ = new_vertex_chunk_index;
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}

if (adj_list_type_ == AdjListType::unordered_by_dest) {
Expand All @@ -985,7 +1024,9 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) {
seek_offset_ = offset;
chunk_index_ = offset / edge_info_->GetChunkSize();
if (chunk_index_ != pre_chunk_index) {
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}
if (chunk_num_ < 0) {
// initialize chunk_num_
Expand Down Expand Up @@ -1024,6 +1065,8 @@ AdjListPropertyArrowChunkReader::GetChunk() {
GAR_RETURN_NOT_OK(
CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
}
chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_),
chunk_table_);
}
IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
return chunk_table_->Slice(row_offset);
Expand All @@ -1049,31 +1092,40 @@ Status AdjListPropertyArrowChunkReader::next_chunk() {
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
}
seek_offset_ = chunk_index_ * edge_info_->GetChunkSize();
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
return Status::OK();
}

Status AdjListPropertyArrowChunkReader::seek_chunk_index(
IdType vertex_chunk_index, IdType chunk_index) {
bool changed = false;
if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) {
vertex_chunk_index_ = vertex_chunk_index;
GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum());
chunk_table_.reset();
changed = true;
}
if (chunk_index_ != chunk_index) {
if (chunk_index_ != chunk_index || changed) {
chunk_index_ = chunk_index;
seek_offset_ = chunk_index * edge_info_->GetChunkSize();
chunk_table_.reset();
auto key = std::make_pair(vertex_chunk_index_, chunk_index_);
auto* cached = chunk_cache_.Get(key);
chunk_table_ = cached ? *cached : nullptr;
}
return Status::OK();
}

void AdjListPropertyArrowChunkReader::Filter(util::Filter filter) {
filter_options_.filter = filter;
chunk_table_ = nullptr;
chunk_cache_.Disable();
}

void AdjListPropertyArrowChunkReader::Select(util::ColumnNames column_names) {
filter_options_.columns = column_names;
chunk_table_ = nullptr;
chunk_cache_.Clear();
}

Result<std::shared_ptr<AdjListPropertyArrowChunkReader>>
Expand Down
Loading
Loading