Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif ()

message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")

# Add common flags
set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")

Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(ARROW_IO_TEST_LINK_LIBS

set(ARROW_IO_SRCS
file.cc
interfaces.cc
memory.cc
)

Expand Down
10 changes: 1 addition & 9 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,7 @@ Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
return impl_->Read(nbytes, bytes_read, out);
}

Status ReadableFile::ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
RETURN_NOT_OK(Seek(position));
return impl_->Read(nbytes, bytes_read, out);
}

Status ReadableFile::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
RETURN_NOT_OK(Seek(position));
Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
return impl_->ReadBuffer(nbytes, out);
}

Expand Down
6 changes: 2 additions & 4 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,9 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
Status Close() override;
Status Tell(int64_t* position) override;

Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status GetSize(int64_t* size) override;
Status Seek(int64_t position) override;

Expand Down
46 changes: 42 additions & 4 deletions cpp/src/arrow/io/hdfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <string>

#include "arrow/io/hdfs.h"
#include "arrow/util/buffer.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

namespace arrow {
Expand Down Expand Up @@ -89,7 +91,7 @@ class HdfsAnyFileImpl {
// Private implementation for read-only files
class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
public:
HdfsReadableFileImpl() {}
explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {}

Status Close() {
if (is_open_) {
Expand All @@ -108,13 +110,39 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
return Status::OK();
}

Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
auto buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(buffer->Resize(nbytes));

int64_t bytes_read = 0;
RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));

if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }

*out = buffer;
return Status::OK();
}

Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
RETURN_NOT_OK(CheckReadResult(ret));
*bytes_read = ret;
return Status::OK();
}

Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
auto buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(buffer->Resize(nbytes));

int64_t bytes_read = 0;
RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));

if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }

*out = buffer;
return Status::OK();
}

Status GetSize(int64_t* size) {
hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str());
if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); }
Expand All @@ -123,10 +151,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
hdfsFreeFileInfo(entry, 1);
return Status::OK();
}

void set_memory_pool(MemoryPool* pool) { pool_ = pool; }

private:
MemoryPool* pool_;
};

HdfsReadableFile::HdfsReadableFile() {
impl_.reset(new HdfsReadableFileImpl());
HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) {
if (pool == nullptr) { pool = default_memory_pool(); }
impl_.reset(new HdfsReadableFileImpl(pool));
}

HdfsReadableFile::~HdfsReadableFile() {
Expand All @@ -144,7 +178,7 @@ Status HdfsReadableFile::ReadAt(

Status HdfsReadableFile::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::NotImplemented("Not yet implemented");
return impl_->ReadAt(position, nbytes, out);
}

bool HdfsReadableFile::supports_zero_copy() const {
Expand All @@ -155,6 +189,10 @@ Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buff
return impl_->Read(nbytes, bytes_read, buffer);
}

Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* buffer) {
return impl_->Read(nbytes, buffer);
}

Status HdfsReadableFile::GetSize(int64_t* size) {
return impl_->GetSize(size);
}
Expand Down
13 changes: 9 additions & 4 deletions cpp/src/arrow/io/hdfs.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {

Status GetSize(int64_t* size) override;

// NOTE: If you wish to read a particular range of a file in a multithreaded
// context, you may prefer to use ReadAt to avoid locking issues
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;

Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;

Expand All @@ -174,17 +180,16 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;

// NOTE: If you wish to read a particular range of a file in a multithreaded
// context, you may prefer to use ReadAt to avoid locking issues
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
void set_memory_pool(MemoryPool* pool);

private:
explicit HdfsReadableFile(MemoryPool* pool = nullptr);

class ARROW_NO_EXPORT HdfsReadableFileImpl;
std::unique_ptr<HdfsReadableFileImpl> impl_;

friend class HdfsClient::HdfsClientImpl;

HdfsReadableFile();
DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
};

Expand Down
48 changes: 48 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/io/interfaces.h"

#include <cstdint>
#include <memory>

#include "arrow/util/buffer.h"
#include "arrow/util/status.h"

namespace arrow {
namespace io {

FileInterface::~FileInterface() {}

ReadableFileInterface::ReadableFileInterface() {
set_mode(FileMode::READ);
}

Status ReadableFileInterface::ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status ReadableFileInterface::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

} // namespace io
} // namespace arrow
26 changes: 15 additions & 11 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
#include <memory>

#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"

namespace arrow {

class Buffer;
class MemoryPool;
class Status;

namespace io {
Expand All @@ -43,9 +45,9 @@ class FileSystemClient {
virtual ~FileSystemClient() {}
};

class FileInterface {
class ARROW_EXPORT FileInterface {
public:
virtual ~FileInterface() {}
virtual ~FileInterface() = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually implemented interfaces.cc so it should not be deleted here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this on purpose — I had to do some wrangling with the class hierarchy when I added default virtual implementations (which can be overridden, like they are for HDFS) of ReadAt

virtual Status Close() = 0;
virtual Status Tell(int64_t* position) = 0;

Expand All @@ -54,7 +56,6 @@ class FileInterface {
protected:
FileInterface() {}
FileMode::type mode_;

void set_mode(FileMode::type mode) { mode_ = mode; }

private:
Expand All @@ -74,6 +75,9 @@ class Writeable {
class Readable {
public:
virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;

// Does not copy if not necessary
virtual Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
};

class OutputStream : public FileInterface, public Writeable {
Expand All @@ -86,21 +90,21 @@ class InputStream : public FileInterface, public Readable {
InputStream() {}
};

class ReadableFileInterface : public InputStream, public Seekable {
class ARROW_EXPORT ReadableFileInterface : public InputStream, public Seekable {
public:
virtual Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;

virtual Status GetSize(int64_t* size) = 0;

// Does not copy if not necessary
virtual bool supports_zero_copy() const = 0;

// Read at position, provide default implementations using Read(...), but can
// be overridden
virtual Status ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out);

virtual bool supports_zero_copy() const = 0;
virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out);

protected:
ReadableFileInterface() { set_mode(FileMode::READ); }
ReadableFileInterface();
};

class WriteableFileInterface : public OutputStream, public Seekable {
Expand Down
40 changes: 16 additions & 24 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
ReadableFileInterface::set_mode(mode);
}

MemoryMappedFile::~MemoryMappedFile() {}

Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out) {
std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
Expand Down Expand Up @@ -161,16 +163,8 @@ Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
return Status::OK();
}

Status MemoryMappedFile::ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
RETURN_NOT_OK(impl_->Seek(position));
return Read(nbytes, bytes_read, out);
}

Status MemoryMappedFile::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
nbytes = std::min(nbytes, impl_->size() - position);
RETURN_NOT_OK(impl_->Seek(position));
Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
nbytes = std::min(nbytes, impl_->size() - impl_->position());
*out = std::make_shared<Buffer>(impl_->head(), nbytes);
impl_->advance(nbytes);
return Status::OK();
Expand Down Expand Up @@ -246,6 +240,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
// ----------------------------------------------------------------------
// In-memory buffer reader

BufferReader::BufferReader(const uint8_t* buffer, int buffer_size)
: buffer_(buffer), buffer_size_(buffer_size), position_(0) {}

BufferReader::~BufferReader() {}

Status BufferReader::Close() {
// no-op
return Status::OK();
Expand All @@ -256,20 +255,6 @@ Status BufferReader::Tell(int64_t* position) {
return Status::OK();
}

Status BufferReader::ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, buffer);
}

Status BufferReader::ReadAt(
int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
int64_t size = std::min(nbytes, buffer_size_ - position_);
*out = std::make_shared<Buffer>(buffer_ + position, size);
position_ += nbytes;
return Status::OK();
}

bool BufferReader::supports_zero_copy() const {
return true;
}
Expand All @@ -281,6 +266,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer)
return Status::OK();
}

Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
int64_t size = std::min(nbytes, buffer_size_ - position_);
*out = std::make_shared<Buffer>(buffer_ + position_, size);
position_ += nbytes;
return Status::OK();
}

Status BufferReader::GetSize(int64_t* size) {
*size = buffer_size_;
return Status::OK();
Expand Down
Loading