Skip to content

Commit

Permalink
Use Arrow's ThreadPool to load record batch (milvus-io#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddyxu authored Aug 4, 2022
1 parent 3264363 commit 60b57c4
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 24 deletions.
4 changes: 3 additions & 1 deletion cpp/src/lance/arrow/file_lance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "lance/arrow/file_lance.h"

#include <arrow/dataset/file_base.h>
#include <arrow/util/thread_pool.h>
#include <fmt/format.h>

#include <memory>
Expand Down Expand Up @@ -72,7 +73,8 @@ ::arrow::Result<::arrow::RecordBatchGenerator> LanceFileFormat::ScanBatchesAsync
offset = lance_fragment_scan_options->offset;
}

auto batch_reader = lance::io::RecordBatchReader(reader, options, limit, offset);
auto batch_reader = lance::io::RecordBatchReader(
reader, options, ::arrow::internal::GetCpuThreadPool(), limit, offset);
ARROW_RETURN_NOT_OK(batch_reader.Open());
auto generator = ::arrow::RecordBatchGenerator(std::move(batch_reader));
return generator;
Expand Down
56 changes: 45 additions & 11 deletions cpp/src/lance/io/record_batch_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
#include <arrow/record_batch.h>
#include <arrow/status.h>
#include <arrow/util/future.h>
#include <arrow/util/thread_pool.h>
#include <fmt/format.h>
#include <fmt/ranges.h>

#include <algorithm>
#include <future>
#include <tuple>

#include "lance/format/metadata.h"
Expand All @@ -36,9 +36,16 @@ namespace lance::io {

RecordBatchReader::RecordBatchReader(std::shared_ptr<FileReader> reader,
std::shared_ptr<arrow::dataset::ScanOptions> options,
::arrow::internal::ThreadPool* thread_pool,
std::optional<int64_t> limit,
int64_t offset) noexcept
: reader_(reader), options_(options), limit_(limit), offset_(offset) {}
: reader_(reader),
options_(options),
limit_(limit),
offset_(offset),
thread_pool_(thread_pool) {
assert(thread_pool_);
}

RecordBatchReader::RecordBatchReader(const RecordBatchReader& other) noexcept
: reader_(other.reader_),
Expand All @@ -47,6 +54,7 @@ RecordBatchReader::RecordBatchReader(const RecordBatchReader& other) noexcept
offset_(other.offset_),
schema_(other.schema_),
project_(other.project_),
thread_pool_(other.thread_pool_),
current_batch_(int(other.current_batch_)) {}

RecordBatchReader::RecordBatchReader(RecordBatchReader&& other) noexcept
Expand All @@ -56,7 +64,9 @@ RecordBatchReader::RecordBatchReader(RecordBatchReader&& other) noexcept
offset_(other.offset_),
schema_(std::move(other.schema_)),
project_(std::move(other.project_)),
current_batch_(int(other.current_batch_)) {}
thread_pool_(std::move(other.thread_pool_)),
current_batch_(int(other.current_batch_)),
readahead_queue_(std::move(other.readahead_queue_)) {}

::arrow::Status RecordBatchReader::Open() {
schema_ = std::make_shared<lance::format::Schema>(reader_->schema());
Expand All @@ -70,19 +80,43 @@ std::shared_ptr<::arrow::Schema> RecordBatchReader::schema() const {

::arrow::Status RecordBatchReader::ReadNext(std::shared_ptr<::arrow::RecordBatch>* batch) {
int32_t batch_id = current_batch_++;
if (batch_id < reader_->metadata().num_batches()) {
ARROW_ASSIGN_OR_RAISE(auto batch_read, project_->Execute(reader_, batch_id));
if (batch_read) {
*batch = std::move(batch_read);
}
ARROW_ASSIGN_OR_RAISE(auto batch_read, ReadBatch(batch_id));
if (batch_read) {
*batch = std::move(batch_read);
}
return ::arrow::Status::OK();
}

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> RecordBatchReader::ReadBatch(
int32_t batch_id) const {
if (batch_id < reader_->metadata().num_batches()) {
return project_->Execute(reader_, batch_id);
}
return nullptr;
}

::arrow::Future<std::shared_ptr<::arrow::RecordBatch>> RecordBatchReader::operator()() {
/// TODO: Make it truly async someday.
auto f = ::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>::MakeFinished(this->Next());
return f;
int total_batches = reader_->metadata().num_batches();
while (static_cast<int32_t>(readahead_queue_.size()) < options_->batch_readahead &&
current_batch_ < total_batches) {
int32_t batch_id = current_batch_++;
auto result = thread_pool_->Submit(
[&](int32_t batch_id) {
return ::arrow::Future<std::shared_ptr<arrow::RecordBatch>>::MakeFinished(
this->ReadBatch(batch_id));
},
batch_id);
if (!result.ok()) {
return result.status();
}
readahead_queue_.emplace(std::move(result.ValueOrDie()));
}
if (readahead_queue_.empty()) {
return std::shared_ptr<::arrow::RecordBatch>(nullptr);
}
auto first = readahead_queue_.front();
readahead_queue_.pop();
return first;
}

} // namespace lance::io
6 changes: 6 additions & 0 deletions cpp/src/lance/io/record_batch_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <arrow/record_batch.h>
#include <arrow/type_fwd.h>
#include <arrow/util/thread_pool.h>

#include <atomic>
#include <future>
Expand All @@ -42,6 +43,7 @@ class RecordBatchReader : ::arrow::RecordBatchReader {
/// Constructor.
RecordBatchReader(std::shared_ptr<FileReader> reader,
std::shared_ptr<::arrow::dataset::ScanOptions> options,
::arrow::internal::ThreadPool* thread_pool_,
std::optional<int64_t> limit = std::nullopt,
int64_t offset = 0) noexcept;

Expand All @@ -66,6 +68,8 @@ class RecordBatchReader : ::arrow::RecordBatchReader {
private:
RecordBatchReader() = delete;

::arrow::Result<std::shared_ptr<::arrow::RecordBatch>> ReadBatch(int32_t batch_id) const;

std::shared_ptr<FileReader> reader_;
std::shared_ptr<::arrow::dataset::ScanOptions> options_;
std::optional<int64_t> limit_ = std::nullopt;
Expand All @@ -74,7 +78,9 @@ class RecordBatchReader : ::arrow::RecordBatchReader {
/// Projection over the dataset.
std::shared_ptr<Project> project_;

::arrow::internal::ThreadPool* thread_pool_;
std::atomic_int32_t current_batch_ = 0;
std::queue<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>> readahead_queue_;
};

} // namespace lance::io
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
long_description="",
ext_modules=cythonize(extensions, language_level="3"),
zip_safe=False,
install_requires=["pyarrow"],
install_requires=["pyarrow>=8,<9"],
extras_require={"test": ["pytest>=6.0", "pandas", "duckdb"]},
python_requires=">=3.8",
packages=find_packages(),
Expand Down
21 changes: 14 additions & 7 deletions python/tools/Dockerfile.manylinux2014
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ FROM quay.io/pypa/manylinux2014_x86_64

ENV LD_LIBRARY_PATH=/usr/local/lib

ENV ARROW_VERSION=8.0.0-1.el7.x86_64

RUN yum update -y \
&& yum install -y epel-release || yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1).noarch.rpm \
&& yum install -y https://apache.jfrog.io/artifactory/arrow/centos/$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1)/apache-arrow-release-latest.rpm \
&& yum install -y --enablerepo=epel arrow-devel \
arrow-dataset-devel arrow-python-devel parquet-devel wget \
ftp://ftp.pbone.net/mirror/ftp.centos.org/7.9.2009/cloud/x86_64/openstack-train/Packages/p/protobuf-devel-3.6.1-4.el7.x86_64.rpm \
ftp://ftp.pbone.net/mirror/vault.centos.org/7.8.2003/cloud/x86_64/openstack-train/Packages/p/protobuf-compiler-3.6.1-4.el7.x86_64.rpm \
ftp://ftp.pbone.net/mirror/vault.centos.org/7.8.2003/cloud/x86_64/openstack-train/Packages/p/protobuf-3.6.1-4.el7.x86_64.rpm
&& yum install -y epel-release || yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-$(cut -d: -f5 /etc/system-release-cpe | cut -d. -f1).noarch.rpm \
&& yum install -y https://apache.jfrog.io/artifactory/arrow/centos/7/x86_64/Packages/apache-arrow-release-8.0.0-1.el7.noarch.rpm \
&& yum install -y --enablerepo=epel \
arrow-python-devel-${ARROW_VERSION} \
arrow-devel-${ARROW_VERSION} \
arrow-glib-devel-${ARROW_VERSION} \
arrow-dataset-devel-${ARROW_VERSION} \
parquet-devel-${ARROW_VERSION} \
wget \
ftp://ftp.pbone.net/mirror/ftp.centos.org/7.9.2009/cloud/x86_64/openstack-train/Packages/p/protobuf-devel-3.6.1-4.el7.x86_64.rpm \
ftp://ftp.pbone.net/mirror/vault.centos.org/7.8.2003/cloud/x86_64/openstack-train/Packages/p/protobuf-compiler-3.6.1-4.el7.x86_64.rpm \
ftp://ftp.pbone.net/mirror/vault.centos.org/7.8.2003/cloud/x86_64/openstack-train/Packages/p/protobuf-3.6.1-4.el7.x86_64.rpm
6 changes: 3 additions & 3 deletions python/tools/auditwheel
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import sys
from auditwheel.main import main
from auditwheel.policy import _POLICIES as POLICIES

for p in POLICIES:
p['lib_whitelist'].extend(['libarrow.so.800', 'libarrow_dataset.so.800'])
#for p in POLICIES:
# p['lib_whitelist'].extend(['libarrow.so.800', 'libarrow_dataset.so.800'])

if __name__ == "__main__":
sys.exit(main())
sys.exit(main())
2 changes: 1 addition & 1 deletion python/tools/build_manylinux_wheels.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pushd /code/python
rm -rf wheels dist build
for py in "${py_versions[@]}"
do
/opt/python/${py}-${py}/bin/pip install numpy pyarrow cython
/opt/python/${py}-${py}/bin/pip install numpy "pyarrow<9" cython
/opt/python/${py}-${py}/bin/python setup.py bdist_wheel
done

Expand Down

0 comments on commit 60b57c4

Please sign in to comment.