Skip to content

Commit

Permalink
Support user to choose different compression algorithms and compressi…
Browse files Browse the repository at this point in the history
…on levels (#4161)

close #3671
  • Loading branch information
hehechen authored Mar 2, 2022
1 parent fa48fa2 commit d321064
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 219 deletions.
2 changes: 1 addition & 1 deletion contrib/zstd
Submodule zstd updated 458 files
91 changes: 59 additions & 32 deletions contrib/zstd-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,53 +50,80 @@ GetLibraryVersion("${HEADER_CONTENT}" LIBVER_MAJOR LIBVER_MINOR LIBVER_RELEASE)
MESSAGE(STATUS "ZSTD VERSION ${LIBVER_MAJOR}.${LIBVER_MINOR}.${LIBVER_RELEASE}")

SET(Sources
${LIBRARY_DIR}/dictBuilder/cover.c
${LIBRARY_DIR}/dictBuilder/zdict.c
${LIBRARY_DIR}/dictBuilder/divsufsort.c
${LIBRARY_DIR}/dictBuilder/fastcover.c
${LIBRARY_DIR}/compress/zstd_fast.c
${LIBRARY_DIR}/compress/fse_compress.c
${LIBRARY_DIR}/compress/hist.c
${LIBRARY_DIR}/compress/zstd_lazy.c
${LIBRARY_DIR}/compress/zstd_compress_superblock.c
${LIBRARY_DIR}/compress/zstdmt_compress.c
${LIBRARY_DIR}/compress/zstd_ldm.c
${LIBRARY_DIR}/compress/zstd_compress_literals.c
${LIBRARY_DIR}/compress/zstd_compress.c
${LIBRARY_DIR}/compress/zstd_opt.c
${LIBRARY_DIR}/compress/zstd_compress_sequences.c
${LIBRARY_DIR}/compress/huf_compress.c
${LIBRARY_DIR}/compress/zstd_double_fast.c
${LIBRARY_DIR}/common/entropy_common.c
${LIBRARY_DIR}/common/error_private.c
${LIBRARY_DIR}/common/debug.c
${LIBRARY_DIR}/common/fse_decompress.c
${LIBRARY_DIR}/common/pool.c
${LIBRARY_DIR}/common/error_private.c
${LIBRARY_DIR}/common/threading.c
${LIBRARY_DIR}/common/xxhash.c
${LIBRARY_DIR}/common/zstd_common.c
${LIBRARY_DIR}/compress/fse_compress.c
${LIBRARY_DIR}/compress/huf_compress.c
${LIBRARY_DIR}/compress/zstd_compress.c
${LIBRARY_DIR}/compress/zstd_double_fast.c
${LIBRARY_DIR}/compress/zstd_fast.c
${LIBRARY_DIR}/compress/zstd_lazy.c
${LIBRARY_DIR}/compress/zstd_ldm.c
${LIBRARY_DIR}/compress/zstdmt_compress.c
${LIBRARY_DIR}/compress/zstd_opt.c
${LIBRARY_DIR}/common/xxhash.c
${LIBRARY_DIR}/deprecated/zbuff_decompress.c
${LIBRARY_DIR}/deprecated/zbuff_compress.c
${LIBRARY_DIR}/deprecated/zbuff_common.c
${LIBRARY_DIR}/decompress/zstd_decompress_block.c
${LIBRARY_DIR}/decompress/huf_decompress.c
${LIBRARY_DIR}/decompress/zstd_decompress.c
${LIBRARY_DIR}/deprecated/zbuff_common.c
${LIBRARY_DIR}/deprecated/zbuff_compress.c
${LIBRARY_DIR}/deprecated/zbuff_decompress.c
${LIBRARY_DIR}/dictBuilder/cover.c
${LIBRARY_DIR}/dictBuilder/divsufsort.c
${LIBRARY_DIR}/dictBuilder/zdict.c)
${LIBRARY_DIR}/decompress/zstd_ddict.c
${LIBRARY_DIR}/decompress/huf_decompress_amd64.S)

SET(Headers
${LIBRARY_DIR}/common/bitstream.h
${LIBRARY_DIR}/dictBuilder/cover.h
${LIBRARY_DIR}/dictBuilder/divsufsort.h
${LIBRARY_DIR}/compress/zstd_ldm_geartab.h
${LIBRARY_DIR}/compress/zstd_compress_internal.h
${LIBRARY_DIR}/compress/zstd_ldm.h
${LIBRARY_DIR}/compress/zstd_double_fast.h
${LIBRARY_DIR}/compress/clevels.h
${LIBRARY_DIR}/compress/zstd_lazy.h
${LIBRARY_DIR}/compress/zstdmt_compress.h
${LIBRARY_DIR}/compress/zstd_fast.h
${LIBRARY_DIR}/compress/zstd_opt.h
${LIBRARY_DIR}/compress/hist.h
${LIBRARY_DIR}/compress/zstd_cwksp.h
${LIBRARY_DIR}/compress/zstd_compress_superblock.h
${LIBRARY_DIR}/compress/zstd_compress_sequences.h
${LIBRARY_DIR}/compress/zstd_compress_literals.h
${LIBRARY_DIR}/common/portability_macros.h
${LIBRARY_DIR}/common/error_private.h
${LIBRARY_DIR}/common/fse.h
${LIBRARY_DIR}/common/huf.h
${LIBRARY_DIR}/common/mem.h
${LIBRARY_DIR}/common/compiler.h
${LIBRARY_DIR}/common/bitstream.h
${LIBRARY_DIR}/common/debug.h
${LIBRARY_DIR}/common/pool.h
${LIBRARY_DIR}/common/threading.h
${LIBRARY_DIR}/common/fse.h
${LIBRARY_DIR}/common/xxhash.h
${LIBRARY_DIR}/common/zstd_errors.h
${LIBRARY_DIR}/common/threading.h
${LIBRARY_DIR}/common/zstd_trace.h
${LIBRARY_DIR}/common/cpu.h
${LIBRARY_DIR}/common/zstd_internal.h
${LIBRARY_DIR}/compress/zstd_double_fast.h
${LIBRARY_DIR}/compress/zstd_fast.h
${LIBRARY_DIR}/compress/zstd_lazy.h
${LIBRARY_DIR}/compress/zstd_ldm.h
${LIBRARY_DIR}/compress/zstdmt_compress.h
${LIBRARY_DIR}/compress/zstd_opt.h
${LIBRARY_DIR}/compress/zstd_ldm.h
${LIBRARY_DIR}/common/zstd_deps.h
${LIBRARY_DIR}/deprecated/zbuff.h
${LIBRARY_DIR}/dictBuilder/divsufsort.h
${LIBRARY_DIR}/dictBuilder/zdict.h
${LIBRARY_DIR}/zstd.h)
${LIBRARY_DIR}/zstd.h
${LIBRARY_DIR}/legacy/zstd_legacy.h
${LIBRARY_DIR}/zdict.h
${LIBRARY_DIR}/zstd_errors.h
${LIBRARY_DIR}/decompress/zstd_decompress_block.h
${LIBRARY_DIR}/decompress/zstd_ddict.h
${LIBRARY_DIR}/decompress/zstd_decompress_internal.h)

SET(ZSTD_LEGACY_SUPPORT true)

Expand Down Expand Up @@ -124,7 +151,7 @@ IF (ZSTD_LEGACY_SUPPORT)
${LIBRARY_LEGACY_DIR}/zstd_v06.h
${LIBRARY_LEGACY_DIR}/zstd_v07.h)
ENDIF (ZSTD_LEGACY_SUPPORT)

ENABLE_LANGUAGE(ASM)
ADD_LIBRARY(zstd ${Sources} ${Headers})

target_include_directories (zstd PUBLIC ${LIBRARY_DIR})
2 changes: 1 addition & 1 deletion dbms/src/IO/CompressedWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void CompressedWriteBuffer<add_checksum>::nextImpl()
compressed_buffer[0] = static_cast<UInt8>(CompressionMethodByte::LZ4);

if (compression_settings.method == CompressionMethod::LZ4)
compressed_size = header_size + LZ4_compress_default(working_buffer.begin(), &compressed_buffer[header_size], uncompressed_size, LZ4_COMPRESSBOUND(uncompressed_size));
compressed_size = header_size + LZ4_compress_fast(working_buffer.begin(), &compressed_buffer[header_size], uncompressed_size, LZ4_COMPRESSBOUND(uncompressed_size), compression_settings.level);
else
compressed_size = header_size + LZ4_compress_HC(working_buffer.begin(), &compressed_buffer[header_size], uncompressed_size, LZ4_COMPRESSBOUND(uncompressed_size), compression_settings.level);

Expand Down
5 changes: 3 additions & 2 deletions dbms/src/IO/CompressionSettings.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "CompressionSettings.h"

#include <Interpreters/Settings.h>
#include <lz4hc.h>


namespace DB
Expand All @@ -23,9 +24,9 @@ int CompressionSettings::getDefaultLevel(CompressionMethod method)
switch (method)
{
case CompressionMethod::LZ4:
return -1;
return 1;
case CompressionMethod::LZ4HC:
return 0;
return LZ4HC_CLEVEL_DEFAULT;
case CompressionMethod::ZSTD:
return 1;
default:
Expand Down
38 changes: 8 additions & 30 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include <Poco/Net/IPAddress.h>
#include <Poco/UUID.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/CompressionSettingsSelector.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/IStorage.h>
Expand Down Expand Up @@ -140,8 +139,6 @@ struct ContextShared
mutable TMTContextPtr tmt_context; /// Context of TiFlash. Note that this should be free before background_pool.
MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<Compiler> compiler; /// Used for dynamic compilation of queries' parts if it necessary.
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionSettingsSelector> compression_settings_selector;
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
String format_schema_path; /// Path to a directory that contains schema files used by input formats.

Expand Down Expand Up @@ -191,7 +188,7 @@ struct ContextShared

Context::ConfigReloadCallback config_reload_callback;

ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
explicit ContextShared(std::shared_ptr<IRuntimeComponentsFactory> runtime_components_factory_)
: runtime_components_factory(std::move(runtime_components_factory_))
{
/// TODO: make it singleton (?)
Expand Down Expand Up @@ -308,7 +305,7 @@ const ProcessList & Context::getProcessList() const
}


const Databases Context::getDatabases() const
Databases Context::getDatabases() const
{
auto lock = getLock();
return shared->databases;
Expand All @@ -323,7 +320,7 @@ Databases Context::getDatabases()

Context::SessionKey Context::getSessionKey(const String & session_id) const
{
auto & user_name = client_info.current_user;
const auto & user_name = client_info.current_user;

if (user_name.empty())
throw Exception("Empty user name.", ErrorCodes::LOGICAL_ERROR);
Expand Down Expand Up @@ -438,7 +435,7 @@ static String resolveDatabase(const String & database_name, const String & curre
}


const DatabasePtr Context::getDatabase(const String & database_name) const
DatabasePtr Context::getDatabase(const String & database_name) const
{
auto lock = getLock();
String db = resolveDatabase(database_name, current_database);
Expand All @@ -454,7 +451,7 @@ DatabasePtr Context::getDatabase(const String & database_name)
return shared->databases[db];
}

const DatabasePtr Context::tryGetDatabase(const String & database_name) const
DatabasePtr Context::tryGetDatabase(const String & database_name) const
{
auto lock = getLock();
String db = resolveDatabase(database_name, current_database);
Expand Down Expand Up @@ -818,7 +815,7 @@ Tables Context::getExternalTables() const
auto lock = getLock();

Tables res;
for (auto & table : external_tables)
for (const auto & table : external_tables)
res[table.first] = table.second.first;

if (session_context && session_context != this)
Expand Down Expand Up @@ -850,7 +847,7 @@ StoragePtr Context::getTable(const String & database_name, const String & table_
Exception exc;
auto res = getTableImpl(database_name, table_name, &exc);
if (!res)
throw exc;
throw Exception(exc);
return res;
}

Expand Down Expand Up @@ -1516,7 +1513,7 @@ SchemaSyncServicePtr & Context::getSchemaSyncService()
return shared->schema_sync_service;
}

void Context::initializeTiFlashMetrics()
void Context::initializeTiFlashMetrics() const
{
auto lock = getLock();
(void)TiFlashMetrics::instance();
Expand Down Expand Up @@ -1618,25 +1615,6 @@ QueryLog * Context::getQueryLog()
}


CompressionSettings Context::chooseCompressionSettings(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();

if (!shared->compression_settings_selector)
{
constexpr auto config_name = "compression";
auto & config = getConfigRef();

if (config.has(config_name))
shared->compression_settings_selector = std::make_unique<CompressionSettingsSelector>(config, "compression");
else
shared->compression_settings_selector = std::make_unique<CompressionSettingsSelector>();
}

return shared->compression_settings_selector->choose(part_size, part_size_ratio);
}


void Context::setMaxTableSizeToDrop(size_t max_size)
{
// Is initialized at server startup
Expand Down
15 changes: 6 additions & 9 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class Context
* when assertTableDoesntExist or assertDatabaseExists is called inside another function that already
* made this check.
*/
void assertTableDoesntExist(const String & database_name, const String & table_name, bool check_database_acccess_rights = true) const;
void assertDatabaseExists(const String & database_name, bool check_database_acccess_rights = true) const;
void assertTableDoesntExist(const String & database_name, const String & table_name, bool check_database_access_rights = true) const;
void assertDatabaseExists(const String & database_name, bool check_database_access_rights = true) const;

void assertDatabaseDoesntExist(const String & database_name) const;
void checkDatabaseAccessRights(const std::string & database_name) const;
Expand Down Expand Up @@ -278,12 +278,12 @@ class Context
ASTPtr getCreateExternalTableQuery(const String & table_name) const;
ASTPtr getCreateDatabaseQuery(const String & database_name) const;

const DatabasePtr getDatabase(const String & database_name) const;
DatabasePtr getDatabase(const String & database_name) const;
DatabasePtr getDatabase(const String & database_name);
const DatabasePtr tryGetDatabase(const String & database_name) const;
DatabasePtr tryGetDatabase(const String & database_name) const;
DatabasePtr tryGetDatabase(const String & database_name);

const Databases getDatabases() const;
Databases getDatabases() const;
Databases getDatabases();

std::shared_ptr<Context> acquireSession(
Expand Down Expand Up @@ -384,7 +384,7 @@ class Context
const std::vector<size_t> & latest_capacity_quota);
PathCapacityMetricsPtr getPathCapacity() const;

void initializeTiFlashMetrics();
void initializeTiFlashMetrics() const;

void initializeFileProvider(KeyManagerPtr key_manager, bool enable_encryption);
FileProviderPtr getFileProvider() const;
Expand All @@ -406,9 +406,6 @@ class Context
void setMaxTableSizeToDrop(size_t max_size);
void checkTableCanBeDropped(const String & database, const String & table, size_t table_size);

/// Lets you select the compression settings according to the conditions described in the configuration file.
CompressionSettings chooseCompressionSettings(size_t part_size, double part_size_ratio) const;

/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ struct Settings
M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4, "Allows you to select the method of data compression when writing.") \
\
M(SettingInt64, network_zstd_compression_level, 1, "Allows you to select the level of ZSTD compression.") \
\
M(SettingUInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.") \
\
M(SettingBool, log_queries, 0, "Log requests and write the log to the system table.") \
Expand Down Expand Up @@ -304,7 +303,8 @@ struct Settings
\
M(SettingUInt64, dt_checksum_frame_size, DBMS_DEFAULT_BUFFER_SIZE, "Frame size for delta tree stable storage") \
M(SettingChecksumAlgorithm, dt_checksum_algorithm, ChecksumAlgo::XXH3, "Checksum algorithm for delta tree stable storage") \
\
M(SettingCompressionMethod, dt_compression_method, CompressionMethod::LZ4, "The method of data compression when writing.") \
M(SettingInt64, dt_compression_level, 1, "The compression level.") \
M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \
M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \
M(SettingOverflowMode<false>, set_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Interpreters/SettingsCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/String.h>
#include <Poco/Timespan.h>


Expand Down Expand Up @@ -833,11 +834,12 @@ struct SettingCompressionMethod

static CompressionMethod getCompressionMethod(const String & s)
{
if (s == "lz4")
String lower_str = Poco::toLower(s);
if (lower_str == "lz4")
return CompressionMethod::LZ4;
if (s == "lz4hc")
if (lower_str == "lz4hc")
return CompressionMethod::LZ4HC;
if (s == "zstd")
if (lower_str == "zstd")
return CompressionMethod::ZSTD;

throw Exception("Unknown compression method: '" + s + "', must be one of 'lz4', 'lz4hc', 'zstd'", ErrorCodes::UNKNOWN_COMPRESSION_METHOD);
Expand Down
Loading

0 comments on commit d321064

Please sign in to comment.