Skip to content

Commit

Permalink
MergeSortingBlockInputStream use spiller (#6609)
Browse files Browse the repository at this point in the history
* MergeSortingBlockInputStream use spiller

Signed-off-by: xufei <xufei@pingcap.com>

* address comments

Signed-off-by: xufei <xufei@pingcap.com>

* address comments

Signed-off-by: xufei <xufei@pingcap.com>

* fix

Signed-off-by: xufei <xufei@pingcap.com>

* address comments

Signed-off-by: xufei <xufei@pingcap.com>

* add more ut

Signed-off-by: xufei <xufei@pingcap.com>

* more refine

Signed-off-by: xufei <xufei@pingcap.com>

* make test stable

Signed-off-by: xufei <xufei@pingcap.com>

* refine

Signed-off-by: xufei <xufei@pingcap.com>

* fix comments

Signed-off-by: xufei <xufei@pingcap.com>

* format code

Signed-off-by: xufei <xufei@pingcap.com>

Signed-off-by: xufei <xufei@pingcap.com>
  • Loading branch information
windtalker authored Jan 11, 2023
1 parent 02995ad commit 9b8503b
Show file tree
Hide file tree
Showing 17 changed files with 586 additions and 140 deletions.
42 changes: 42 additions & 0 deletions dbms/src/Core/SpillConfig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/SpillConfig.h>
#include <Encryption/FileProvider.h>
#include <Poco/Path.h>

namespace DB
{
namespace
{
bool needReplace(char c)
{
static String forbidden_or_unusual_chars("\\/:?\"<>|,'*");
return std::isspace(c) || String::npos != forbidden_or_unusual_chars.find(c);
}
} // namespace
SpillConfig::SpillConfig(const DB::String & spill_dir_, const DB::String & spill_id_, size_t max_spilled_size_per_spill_, const FileProviderPtr & file_provider_)
: spill_dir(spill_dir_)
, spill_id(spill_id_)
, spill_id_as_file_name_prefix(spill_id)
, max_spilled_size_per_spill(max_spilled_size_per_spill_)
, file_provider(file_provider_)
{
if (spill_dir.at(spill_dir.size() - 1) != Poco::Path::separator())
{
spill_dir += Poco::Path::separator();
}
std::replace_if(spill_id_as_file_name_prefix.begin(), spill_id_as_file_name_prefix.end(), needReplace, '_');
}
} // namespace DB
35 changes: 35 additions & 0 deletions dbms/src/Core/SpillConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>

namespace DB
{

class FileProvider;
using FileProviderPtr = std::shared_ptr<FileProvider>;

struct SpillConfig
{
public:
SpillConfig(const String & spill_dir_, const String & spill_id_, size_t max_spilled_size_per_spill_, const FileProviderPtr & file_provider_);
String spill_dir;
String spill_id;
String spill_id_as_file_name_prefix;
size_t max_spilled_size_per_spill;
FileProviderPtr file_provider;
};
} // namespace DB
82 changes: 82 additions & 0 deletions dbms/src/Core/SpillHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Core/SpillHandler.h>

namespace DB
{

SpillHandler::SpillHandler(Spiller * spiller_, std::unique_ptr<SpilledFile> && spilled_file, size_t partition_id_)
: spiller(spiller_)
, partition_id(partition_id_)
{
current_spill_file_name = spilled_file->path();
current_spilled_file_index = 0;
spilled_files.push_back(std::move(spilled_file));
}

void SpillHandler::spillBlocks(const Blocks & blocks)
{
/// todo 1. set max_file_size and spill to new file if needed
/// 2. check the disk usage
if (unlikely(blocks.empty()))
return;
RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler meeting error or finished.", spiller->config.spill_id);
try
{
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
LOG_INFO(spiller->logger, "Spilling {} blocks data into temporary file {}", blocks.size(), current_spill_file_name);
size_t spilled_data_size = 0;
if (unlikely(writer == nullptr))
{
writer = std::make_unique<SpillWriter>(spiller->config.file_provider, current_spill_file_name, blocks[0].cloneEmpty());
writer->out->writePrefix();
}
for (const auto & block : blocks)
{
auto block_bytes_size = block.bytes();
writer->out->write(block);
spilled_files[current_spilled_file_index]->addSpilledDataSize(block_bytes_size);
spilled_data_size += block_bytes_size;
}
LOG_INFO(spiller->logger, "Finish Spilling data into temporary file {}, spilled data size: {}", current_spill_file_name, spilled_data_size);
RUNTIME_CHECK_MSG(current_spilled_file_index >= 0, "{}: spill after the spill handler is finished.", spiller->config.spill_id);
RUNTIME_CHECK_MSG(spiller->spill_finished == false, "{}: spill after the spiller is finished.", spiller->config.spill_id);
return;
}
catch (...)
{
/// mark the spill handler invalid
writer = nullptr;
spilled_files.clear();
current_spilled_file_index = -1;
throw Exception(fmt::format("Failed to spill blocks to disk for file {}, error: {}", current_spill_file_name, getCurrentExceptionMessage(false, false)));
}
}

void SpillHandler::finish()
{
if (likely(writer != nullptr))
{
writer->out->writeSuffix();
std::unique_lock lock(spiller->spilled_files[partition_id]->spilled_files_mutex);
for (auto & spilled_file : spilled_files)
spiller->spilled_files[partition_id]->spilled_files.push_back(std::move(spilled_file));
spilled_files.clear();
spiller->has_spilled_data = true;
current_spilled_file_index = -1;
}
}

} // namespace DB
59 changes: 59 additions & 0 deletions dbms/src/Core/SpillHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/Spiller.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Encryption/WriteBufferFromFileProvider.h>
#include <IO/CompressedWriteBuffer.h>

namespace DB
{

class IBlockOutputStream;

/// SpillHandler is used to spill blocks, currently hidden behind `Spiller::spillBlocks`
/// and `Spiller::spillBlocksUsingBlockInputStream`, maybe need to be exposed in push model.
/// NOTE 1. SpillHandler is not thread-safe, each thread should use its own spill handler
/// 2. After all the data is spilled, SpillHandler::finish() must be called to submit the spilled data
class SpillHandler
{
public:
SpillHandler(Spiller * spiller_, std::unique_ptr<SpilledFile> && spilled_file, size_t partition_id_);
void spillBlocks(const Blocks & blocks);
void finish();

private:
struct SpillWriter
{
SpillWriter(const FileProviderPtr & file_provider, const String & file_name, const Block & header)
: file_buf(file_provider, file_name, EncryptionPath(file_name, ""))
, compressed_buf(file_buf)
, out(std::make_unique<NativeBlockOutputStream>(compressed_buf, 0, header))
{
}
WriteBufferFromFileProvider file_buf;
CompressedWriteBuffer<> compressed_buf;
std::unique_ptr<IBlockOutputStream> out;
};
Spiller * spiller;
std::vector<std::unique_ptr<SpilledFile>> spilled_files;
size_t partition_id;
Int64 current_spilled_file_index;
String current_spill_file_name;
std::unique_ptr<SpillWriter> writer;
};

} // namespace DB
Loading

0 comments on commit 9b8503b

Please sign in to comment.