Skip to content

Commit

Permalink
Hide more mutexes from public headers
Browse files Browse the repository at this point in the history
Change-Id: I41be2f646135db8bab11ce171ee547b0e09e8d85
  • Loading branch information
wesm committed Oct 2, 2017
1 parent ba3db15 commit 733eefa
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 200 deletions.
15 changes: 7 additions & 8 deletions cpp/src/arrow/allocator-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,16 @@ TEST(stl_allocator, FreeLargeMemory) {
}

TEST(stl_allocator, MaxMemory) {
DefaultMemoryPool pool;
auto pool = default_memory_pool();

ASSERT_EQ(0, pool.max_memory());
stl_allocator<uint8_t> alloc(&pool);
uint8_t* data = alloc.allocate(100);
uint8_t* data2 = alloc.allocate(100);
stl_allocator<uint8_t> alloc(pool);
uint8_t* data = alloc.allocate(1000);
uint8_t* data2 = alloc.allocate(1000);

alloc.deallocate(data, 100);
alloc.deallocate(data2, 100);
alloc.deallocate(data, 1000);
alloc.deallocate(data2, 1000);

ASSERT_EQ(200, pool.max_memory());
ASSERT_EQ(2000, pool->max_memory());
}

#endif // ARROW_VALGRIND
Expand Down
36 changes: 34 additions & 2 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ class OSFile {

FileMode::type mode() const { return mode_; }

std::mutex& lock() { return lock_; }

protected:
Status SetFileName(const std::string& file_name) {
#if defined(_MSC_VER)
Expand Down Expand Up @@ -461,6 +463,20 @@ Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
return impl_->Read(nbytes, bytes_read, out);
}

Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) {
std::lock_guard<std::mutex> guard(impl_->lock());
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
std::lock_guard<std::mutex> guard(impl_->lock());
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
return impl_->ReadBuffer(nbytes, out);
}
Expand Down Expand Up @@ -590,6 +606,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {

int fd() const { return file_->fd(); }

std::mutex& lock() { return file_->lock(); }

private:
std::unique_ptr<OSFile> file_;
int64_t position_;
Expand Down Expand Up @@ -671,10 +689,24 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::OK();
}

Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) {
std::lock_guard<std::mutex> guard(memory_map_->lock());
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
std::lock_guard<std::mutex> guard(memory_map_->lock());
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

bool MemoryMappedFile::supports_zero_copy() const { return true; }

Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
std::lock_guard<std::mutex> guard(memory_map_->lock());

if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
Expand All @@ -685,7 +717,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t
}

Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
std::lock_guard<std::mutex> guard(memory_map_->lock());

if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile {
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) override;

/// Default implementation is thread-safe
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status GetSize(int64_t* size) override;
Status Seek(int64_t position) override;

Expand Down Expand Up @@ -139,6 +145,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
// Zero copy read. Not thread-safe
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) override;

/// Default implementation is thread-safe
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

bool supports_zero_copy() const override;

/// Write data at the current position in the file. Thread-safe
Expand Down
14 changes: 0 additions & 14 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,6 @@ FileInterface::~FileInterface() {}

RandomAccessFile::RandomAccessFile() { set_mode(FileMode::READ); }

Status RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

Status Writeable::Write(const std::string& data) {
return Write(reinterpret_cast<const uint8_t*>(data.c_str()),
static_cast<int64_t>(data.size()));
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -133,16 +132,13 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
///
/// Default implementation is thread-safe
virtual Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out);
uint8_t* out) = 0;

/// Default implementation is thread-safe
virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out);

std::mutex& lock() { return lock_; }
virtual Status ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) = 0;

protected:
std::mutex lock_;

RandomAccessFile();
};

Expand Down
163 changes: 77 additions & 86 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <mutex>

#include "arrow/buffer.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -127,118 +128,96 @@ static constexpr int kMemcopyDefaultNumThreads = 1;
static constexpr int64_t kMemcopyDefaultBlocksize = 64;
static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024;

class FixedSizeBufferWriter::Impl {
public:
class FixedSizeBufferWriter::FixedSizeBufferWriterImpl {
public:
/// Input buffer must be mutable, will abort if not
explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
~FixedSizeBufferWriter();

Status Close() override;
Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;
Status Write(const uint8_t* data, int64_t nbytes) override;
Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;

void set_memcopy_threads(int num_threads);
void set_memcopy_blocksize(int64_t blocksize);
void set_memcopy_threshold(int64_t threshold);
private:
std::mutex lock_;
std::shared_ptr<Buffer> buffer_;
uint8_t* mutable_data_;
int64_t size_;
int64_t position_;

int memcopy_num_threads_;
int64_t memcopy_blocksize_;
int64_t memcopy_threshold_;

};

/// Input buffer must be mutable, will abort if not
FixedSizeBufferWriter::Impl::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
: memcopy_num_threads_(kMemcopyDefaultNumThreads),
memcopy_blocksize_(kMemcopyDefaultBlocksize),
memcopy_threshold_(kMemcopyDefaultThreshold) {
buffer_ = buffer;
DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
mutable_data_ = buffer->mutable_data();
size_ = buffer->size();
position_ = 0;
}
/// Input buffer must be mutable, will abort if not
explicit FixedSizeBufferWriterImpl(const std::shared_ptr<Buffer>& buffer)
: memcopy_num_threads_(kMemcopyDefaultNumThreads),
memcopy_blocksize_(kMemcopyDefaultBlocksize),
memcopy_threshold_(kMemcopyDefaultThreshold) {
buffer_ = buffer;
DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
mutable_data_ = buffer->mutable_data();
size_ = buffer->size();
position_ = 0;
}

FixedSizeBufferWriter::Impl::~FixedSizeBufferWriter() {}
~FixedSizeBufferWriterImpl() {}

Status FixedSizeBufferWriter::Close() {
// No-op
return Status::OK();
}
Status Close() {
// No-op
return Status::OK();
}

Status FixedSizeBufferWriter::Impl::Seek(int64_t position) {
if (position < 0 || position >= size_) {
return Status::IOError("position out of bounds");
Status Seek(int64_t position) {
if (position < 0 || position >= size_) {
return Status::IOError("position out of bounds");
}
position_ = position;
return Status::OK();
}
position_ = position;
return Status::OK();
}

Status FixedSizeBufferWriter::Impl::Tell(int64_t* position) {
*position = position_;
return Status::OK();
}
Status Tell(int64_t* position) {
*position = position_;
return Status::OK();
}

Status FixedSizeBufferWriter::Impl::Write(const uint8_t* data, int64_t nbytes) {
if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
internal::parallel_memcopy(mutable_data_ + position_, data, nbytes,
memcopy_blocksize_, memcopy_num_threads_);
Status Write(const uint8_t* data, int64_t nbytes) {
if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
internal::parallel_memcopy(mutable_data_ + position_, data, nbytes,
memcopy_blocksize_, memcopy_num_threads_);
} else {
memcpy(mutable_data_ + position_, data, nbytes);
}
position_ += nbytes;
return Status::OK();
}
else {
memcpy(mutable_data_ + position_, data, nbytes);

Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Write(data, nbytes);
}
position_ += nbytes;
return Status::OK();
}

Status FixedSizeBufferWriter::Impl::WriteAt(int64_t position, const uint8_t* data,
int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Write(data, nbytes);
}
void set_memcopy_threads(int num_threads) { memcopy_num_threads_ = num_threads; }

void FixedSizeBufferWriter::Impl::set_memcopy_threads(int num_threads) {
memcopy_num_threads_ = num_threads;
}
void set_memcopy_blocksize(int64_t blocksize) { memcopy_blocksize_ = blocksize; }

void FixedSizeBufferWriter::Impl::set_memcopy_blocksize(int64_t blocksize) {
memcopy_blocksize_ = blocksize;
}
void set_memcopy_threshold(int64_t threshold) { memcopy_threshold_ = threshold; }

void FixedSizeBufferWriter::Impl::set_memcopy_threshold(int64_t threshold) {
memcopy_threshold_ = threshold;
}
private:
std::mutex lock_;
std::shared_ptr<Buffer> buffer_;
uint8_t* mutable_data_;
int64_t size_;
int64_t position_;

int memcopy_num_threads_;
int64_t memcopy_blocksize_;
int64_t memcopy_threshold_;
};

FixedSizeBufferWriter::~FixedSizeBufferWriter() {}

FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
: impl_(buffer) {};
: impl_(new FixedSizeBufferWriterImpl(buffer)) {}

Status FixedSizeBufferWriter::Close() {
return impl_->Close();
}
Status FixedSizeBufferWriter::Close() { return impl_->Close(); }

Status FixedSizeBufferWriter::Seek(int64_t position) {
return impl_->Seek(position);
}
Status FixedSizeBufferWriter::Seek(int64_t position) { return impl_->Seek(position); }

Status FixedSizeBufferWriter::Tell(int64_t* position) {
Status FixedSizeBufferWriter::Tell(int64_t* position) const {
return impl_->Tell(position);
}

Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
return impl_->Write(date, nbytes);
return impl_->Write(data, nbytes);
}

Status FixedSizeBufferWriter::WriteAt(int64_t position, const uint8_t* data,
int64_t nbytes) {
int64_t nbytes) {
return impl_->WriteAt(position, data, nbytes);
}

Expand Down Expand Up @@ -297,6 +276,18 @@ Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::OK();
}

Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

Status BufferReader::GetSize(int64_t* size) {
*size = size_;
return Status::OK();
Expand Down
Loading

0 comments on commit 733eefa

Please sign in to comment.