Skip to content

Commit

Permalink
GH-39270: [C++] Avoid creating memory manager instance for every buff…
Browse files Browse the repository at this point in the history
…er view/copy (#39271)

### Rationale for this change

We can use `arrow::default_cpu_memory_manager()` for `default_memory_pool()`.

### What changes are included in this PR?

Check the given `pool` and use `arrow::default_cpu_memory_manager()` if it's `arrow::default_memory_pool()`.

This also caches `arrow::CPUDevice::memory_manager()` result to avoid calling it multiple times. Note that we can avoid creating needless memory manager instance without this. This just avoid calling it multiple times.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

No.
* Closes: #39270

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
kou authored Jan 11, 2024
1 parent 30c4e15 commit 2132cb3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
6 changes: 5 additions & 1 deletion cpp/src/arrow/device.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,11 @@ bool CPUDevice::Equals(const Device& other) const {
}

std::shared_ptr<MemoryManager> CPUDevice::memory_manager(MemoryPool* pool) {
return CPUMemoryManager::Make(Instance(), pool);
if (pool == default_memory_pool()) {
return default_cpu_memory_manager();
} else {
return CPUMemoryManager::Make(Instance(), pool);
}
}

std::shared_ptr<MemoryManager> CPUDevice::default_memory_manager() {
Expand Down
20 changes: 9 additions & 11 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
MemoryPool* pool, bool skip_body)
: listener_(std::move(listener)),
pool_(pool),
memory_manager_(CPUDevice::memory_manager(pool_)),
state_(initial_state),
next_required_size_(initial_next_required_size),
chunks_(),
Expand Down Expand Up @@ -822,8 +823,7 @@ class MessageDecoder::MessageDecoderImpl {
if (buffer->is_cpu()) {
metadata_ = buffer;
} else {
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(metadata_, Buffer::ViewOrCopy(buffer, memory_manager_));
}
return ConsumeMetadata();
}
Expand All @@ -834,16 +834,15 @@ class MessageDecoder::MessageDecoderImpl {
if (chunks_[0]->is_cpu()) {
metadata_ = std::move(chunks_[0]);
} else {
ARROW_ASSIGN_OR_RAISE(
metadata_,
Buffer::ViewOrCopy(chunks_[0], CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(chunks_[0], memory_manager_));
}
chunks_.erase(chunks_.begin());
} else {
metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
if (!chunks_[0]->is_cpu()) {
ARROW_ASSIGN_OR_RAISE(
metadata_, Buffer::ViewOrCopy(metadata_, CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(metadata_, memory_manager_));
}
chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
}
Expand Down Expand Up @@ -911,8 +910,7 @@ class MessageDecoder::MessageDecoderImpl {
if (buffer->is_cpu()) {
return util::SafeLoadAs<int32_t>(buffer->data());
} else {
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer, Buffer::ViewOrCopy(buffer, memory_manager_));
return util::SafeLoadAs<int32_t>(cpu_buffer->data());
}
}
Expand All @@ -924,8 +922,7 @@ class MessageDecoder::MessageDecoderImpl {
std::shared_ptr<Buffer> last_chunk;
for (auto& chunk : chunks_) {
if (!chunk->is_cpu()) {
ARROW_ASSIGN_OR_RAISE(
chunk, Buffer::ViewOrCopy(chunk, CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(chunk, Buffer::ViewOrCopy(chunk, memory_manager_));
}
auto data = chunk->data();
auto data_size = chunk->size();
Expand All @@ -951,6 +948,7 @@ class MessageDecoder::MessageDecoderImpl {

std::shared_ptr<MessageDecoderListener> listener_;
MemoryPool* pool_;
std::shared_ptr<MemoryManager> memory_manager_;
State state_;
int64_t next_required_size_;
std::vector<std::shared_ptr<Buffer>> chunks_;
Expand Down

0 comments on commit 2132cb3

Please sign in to comment.