Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions velox/common/memory/MemoryPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class MemoryPool : public std::enable_shared_from_this<MemoryPool> {
parent_, "Only root memory pool allows to set high-usage callback");
highUsageCallback_ = func;
}

/// TODO: deprecate this after the integration with memory arbitrator.
using GrowCallback = std::function<bool(int64_t size, MemoryPool& pool)>;
virtual void setGrowCallback(GrowCallback func) {
Expand Down Expand Up @@ -509,7 +509,7 @@ class MemoryPoolImpl : public MemoryPool {

int64_t capacity() const override;

bool highUsage() override;
bool highUsage() override;

int64_t getCurrentBytes() const override {
std::lock_guard<std::mutex> l(mutex_);
Expand Down Expand Up @@ -559,14 +559,15 @@ bool highUsage() override;
MemoryAllocator* testingAllocator() const {
return allocator_;
}

MemoryAllocator* getAllocator() {
return allocator_;
}

void setAllocator(MemoryAllocator* allocator) {
allocator_ = allocator;
}

private:
static constexpr uint64_t kMB = 1 << 20;

Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/test/WriterFlushTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MockMemoryPool : public velox::memory::MemoryPool {
/*unused*/) override {
VELOX_UNSUPPORTED("freeContiguous unsupported");
}

bool highUsage() override {
VELOX_NYI("{} unsupported", __FUNCTION__);
}
Expand Down
14 changes: 9 additions & 5 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ void Writer::flush() {
finalSink_.get(),
pool_,
queryCtx_->queryConfig().dataBufferGrowRatio());
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
auto arrowProperties =
::parquet::ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowWriter_,
::parquet::arrow::FileWriter::Open(
Expand All @@ -43,7 +44,9 @@ void Writer::flush() {
std::vector<std::shared_ptr<arrow::ChunkedArray>> chunks;
for (int colIdx = 0; colIdx < fields.size(); colIdx++) {
auto dataType = fields.at(colIdx)->type();
auto chunk = arrow::ChunkedArray::Make(std::move(stagingChunks_.at(colIdx)), dataType).ValueOrDie();
auto chunk = arrow::ChunkedArray::Make(
std::move(stagingChunks_.at(colIdx)), dataType)
.ValueOrDie();
chunks.push_back(chunk);
}
auto table = arrow::Table::Make(schema_, std::move(chunks), stagingRows_);
Expand All @@ -60,8 +63,8 @@ void Writer::flush() {
}

/**
* This method would cache input `ColumnarBatch` to make the size of row group big.
* It would flush when:
* This method would cache input `ColumnarBatch` to make the size of row group
* big. It would flush when:
* - the cached numRows bigger than `maxRowGroupRows_`
* - the cached bytes bigger than `maxRowGroupBytes_`
*
Expand All @@ -83,7 +86,8 @@ void Writer::write(const RowVectorPtr& data) {

auto bytes = data->estimateFlatSize();
auto numRows = data->size();
if (stagingBytes_ + bytes > maxRowGroupBytes_ || stagingRows_ + numRows > maxRowGroupRows_) {
if (stagingBytes_ + bytes > maxRowGroupBytes_ ||
stagingRows_ + numRows > maxRowGroupRows_) {
flush();
}

Expand Down
1 change: 0 additions & 1 deletion velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ bool HashBuild::reserveMemory(const RowVectorPtr& input) {
return false;
}


const auto currentUsage = pool()->getCurrentBytes();
if ((spillMemoryThreshold_ != 0 && currentUsage > spillMemoryThreshold_) ||
pool()->highUsage()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class BloomFilterAggAggregate : public exec::Aggregate {
bool /*mayPushdown*/) override {
decodeArguments(rows, args);
auto accumulator = value<BloomFilterAccumulator>(group);
//VELOX_CHECK(!decodedRaw_.mayHaveNulls());
// VELOX_CHECK(!decodedRaw_.mayHaveNulls());
if (decodedRaw_.isConstantMapping()) {
// all values are same, just do for the first
accumulator->init(capacity_);
Expand Down