Skip to content

Write to Merge storage #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: antalya-v25.3.3.42-lts
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions docs/en/engines/table-engines/special/merge.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Reading is automatically parallelized. Writing to a table is not supported. When
## Creating a Table {#creating-a-table}

``` sql
CREATE TABLE ... Engine=Merge(db_name, tables_regexp)
CREATE TABLE ... Engine=Merge(db_name, tables_regexp [, table_to_write])
```

## Engine Parameters {#engine-parameters}
Expand All @@ -35,6 +35,14 @@ CREATE TABLE ... Engine=Merge(db_name, tables_regexp)
Regular expressions — [re2](https://github.com/google/re2) (supports a subset of PCRE), case-sensitive.
See the notes about escaping symbols in regular expressions in the "match" section.

### table_to_write {#table_to_write}

`table_to_write` - Table name to write during inserts into `Merge` table.
Possible values:
- `'db_name.table_name'` - insert into the specific table in the specific database.
- `'table_name'` - insert into table `db_name.table_name`. Allowed only when the first parameter `db_name` is not a regular expression.
- `auto` - insert into the last table passed to `tables_regexp` in lexicographical order. Allowed only when the first parameter `db_name` is not a regular expression.

## Usage {#usage}

When selecting tables to read, the `Merge` table itself is not selected, even if it matches the regex. This is to avoid loops.
Expand Down Expand Up @@ -65,7 +73,7 @@ CREATE TABLE WatchLog_new(date Date, UserId Int64, EventType String, Cnt UInt64)
ENGINE=MergeTree PARTITION BY date ORDER BY (UserId, EventType) SETTINGS index_granularity=8192;
INSERT INTO WatchLog_new VALUES ('2018-01-02', 2, 'hit', 3);

CREATE TABLE WatchLog as WatchLog_old ENGINE=Merge(currentDatabase(), '^WatchLog');
CREATE TABLE WatchLog as WatchLog_old ENGINE=Merge(currentDatabase(), '^WatchLog', 'WatchLog_new');

SELECT * FROM WatchLog;
```
Expand All @@ -79,6 +87,22 @@ SELECT * FROM WatchLog;
└────────────┴────────┴───────────┴─────┘
```

Insert to table `WatchLog` is going into table `WatchLog_new`
```sql
INSERT INTO WatchLog VALUES ('2018-01-03', 3, 'hit', 3);

SELECT * FROM WatchLog_New;
```

```text
┌───────date─┬─UserId─┬─EventType─┬─Cnt─┐
│ 2018-01-02 │ 2 │ hit │ 3 │
└────────────┴────────┴───────────┴─────┘
┌───────date─┬─UserId─┬─EventType─┬─Cnt─┐
│ 2018-01-03 │ 3 │ hit │ 3 │
└────────────┴────────┴───────────┴─────┘
```

## Virtual Columns {#virtual-columns}

- `_table` — Contains the name of the table from which data was read. Type: [String](../../../sql-reference/data-types/string.md).
Expand Down
134 changes: 130 additions & 4 deletions src/Storages/StorageMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/MaterializingTransform.h>
Expand Down Expand Up @@ -83,6 +84,9 @@ extern const int SAMPLING_NOT_SUPPORTED;
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int STORAGE_REQUIRES_PARAMETER;
extern const int UNKNOWN_TABLE;
extern const int ACCESS_DENIED;
extern const int TABLE_IS_READ_ONLY;
}

namespace
Expand Down Expand Up @@ -143,6 +147,8 @@ StorageMerge::StorageMerge(
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const DBToTableSetMap & source_databases_and_tables_,
const std::optional<String> & table_to_write_,
bool table_to_write_auto_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
Expand All @@ -151,6 +157,7 @@ StorageMerge::StorageMerge(
database_is_regexp_,
source_database_name_or_regexp_, {},
source_databases_and_tables_)
, table_to_write_auto(table_to_write_auto_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_.empty()
Expand All @@ -159,6 +166,8 @@ StorageMerge::StorageMerge(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(createVirtuals());
if (!table_to_write_auto)
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
}

StorageMerge::StorageMerge(
Expand All @@ -168,6 +177,8 @@ StorageMerge::StorageMerge(
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const String & source_table_regexp_,
const std::optional<String> & table_to_write_,
bool table_to_write_auto_,
ContextPtr context_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
Expand All @@ -176,6 +187,7 @@ StorageMerge::StorageMerge(
database_is_regexp_,
source_database_name_or_regexp_,
source_table_regexp_, {})
, table_to_write_auto(table_to_write_auto_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_.empty()
Expand All @@ -184,6 +196,8 @@ StorageMerge::StorageMerge(
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(createVirtuals());
if (!table_to_write_auto)
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
}

StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
Expand Down Expand Up @@ -293,6 +307,29 @@ void StorageMerge::forEachTable(F && func) const
});
}

template <typename F>
void StorageMerge::forEachTableName(F && func) const
{
auto database_table_iterators = database_name_or_regexp.getDatabaseIterators(getContext());

for (auto & iterator : database_table_iterators)
{
while (iterator->isValid())
{
const auto & table = iterator->table();
if (table.get() != this)
{
QualifiedTableName table_name;
table_name.database = iterator->databaseName();
table_name.table = iterator->name();
func(table_name);
}

iterator->next();
}
}
}

bool StorageMerge::isRemote() const
{
auto first_remote_table = traverseTablesUntil([](const StoragePtr & table) { return table && table->isRemote(); });
Expand Down Expand Up @@ -1702,6 +1739,77 @@ std::optional<UInt64> StorageMerge::totalRowsOrBytes(F && func) const
return first_table ? std::nullopt : std::make_optional(total_rows_or_bytes);
}

void StorageMerge::setTableToWrite(
const std::optional<String> & table_to_write_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_)
{
if (!table_to_write_.has_value())
{
table_to_write = std::nullopt;
return;
}

auto qualified_name = QualifiedTableName::parseFromString(*table_to_write_);

if (qualified_name.database.empty())
{
if (database_is_regexp_)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument 'table_to_write' must contain database if 'db_name' is regular expression");

qualified_name.database = source_database_name_or_regexp_;
}

table_to_write = qualified_name;
}

SinkToStoragePtr StorageMerge::write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context_,
bool async_insert)
{
const auto & access = context_->getAccess();

if (table_to_write_auto)
{
table_to_write = std::nullopt;
bool any_table_found = false;
forEachTableName([&](const auto & table_name)
{
any_table_found = true;
if (!table_to_write.has_value() || table_to_write->getFullName() < table_name.getFullName())
{
if (access->isGranted(AccessType::INSERT, table_name.database, table_name.table))
table_to_write = table_name;
}
});
if (!table_to_write.has_value())
{
if (any_table_found)
throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to write in any suitable table for storage {}", getName());
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Can't find any table to write for storage {}", getName());
}
}
else
{
if (!table_to_write.has_value())
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Method write is not allowed in storage {} without described table to write", getName());

access->checkAccess(AccessType::INSERT, table_to_write->database, table_to_write->table);
}

auto database = DatabaseCatalog::instance().getDatabase(table_to_write->database);
auto table = database->getTable(table_to_write->table, context_);
auto table_lock = table->lockForShare(
context_->getInitialQueryId(),
context_->getSettingsRef()[Setting::lock_acquire_timeout]);
auto sink = table->write(query, metadata_snapshot, context_, async_insert);
sink->addTableLock(table_lock);
return sink;
}

void registerStorageMerge(StorageFactory & factory)
{
factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
Expand All @@ -1712,10 +1820,12 @@ void registerStorageMerge(StorageFactory & factory)

ASTs & engine_args = args.engine_args;

if (engine_args.size() != 2)
size_t size = engine_args.size();

if (size < 2 || size > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage Merge requires exactly 2 parameters - name "
"of source database and regexp for table names.");
"Storage Merge requires 2 or 3 parameters - name "
"of source database, regexp for table names, and optional table name for writing");

auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName(engine_args[0], args.getLocalContext());

Expand All @@ -1727,8 +1837,24 @@ void registerStorageMerge(StorageFactory & factory)
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");

std::optional<String> table_to_write = std::nullopt;
bool table_to_write_auto = false;
if (size == 3)
{
bool is_identifier = engine_args[2]->as<ASTIdentifier>();
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
table_to_write = checkAndGetLiteralArgument<String>(engine_args[2], "table_to_write");
if (is_identifier && table_to_write == "auto")
{
if (is_regexp)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RegExp for database with auto table_to_write is forbidden");
table_to_write_auto = true;
}
}

return std::make_shared<StorageMerge>(
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, args.getLocalContext());
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp,
table_name_regexp, table_to_write, table_to_write_auto, args.getLocalContext());
},
{
.supports_schema_inference = true
Expand Down
21 changes: 21 additions & 0 deletions src/Storages/StorageMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class StorageMerge final : public IStorage, WithContext
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const DBToTableSetMap & source_databases_and_tables_,
const std::optional<String> & table_to_write_,
bool table_to_write_auto_,
ContextPtr context_);

StorageMerge(
Expand All @@ -39,6 +41,8 @@ class StorageMerge final : public IStorage, WithContext
const String & source_database_name_or_regexp_,
bool database_is_regexp_,
const String & source_table_regexp_,
const std::optional<String> & table_to_write_,
bool table_to_write_auto_,
ContextPtr context_);

std::string getName() const override { return "Merge"; }
Expand Down Expand Up @@ -70,6 +74,12 @@ class StorageMerge final : public IStorage, WithContext
size_t max_block_size,
size_t num_streams) override;

SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;

void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;

/// you need to add and remove columns in the sub-tables manually
Expand Down Expand Up @@ -122,12 +132,18 @@ class StorageMerge final : public IStorage, WithContext

DatabaseNameOrRegexp database_name_or_regexp;

std::optional<QualifiedTableName> table_to_write;
bool table_to_write_auto = false;

template <typename F>
StoragePtr traverseTablesUntil(F && predicate) const;

template <typename F>
void forEachTable(F && func) const;

template <typename F>
void forEachTableName(F && func) const;

template <typename F>
static StoragePtr traverseTablesUntilImpl(const ContextPtr & query_context, const IStorage * ignore_self, const DatabaseNameOrRegexp & database_name_or_regexp, F && predicate);

Expand All @@ -149,6 +165,11 @@ class StorageMerge final : public IStorage, WithContext
template <typename F>
std::optional<UInt64> totalRowsOrBytes(F && func) const;

void setTableToWrite(
const std::optional<String> & table_to_write_,
const String & source_database_name_or_regexp_,
bool database_is_regexp_);

friend class ReadFromMerge;
};

Expand Down
3 changes: 3 additions & 0 deletions src/TableFunctions/TableFunctionMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,16 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr contex

StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{
std::optional<std::string> table_to_write = std::nullopt;
auto res = std::make_shared<StorageMerge>(
StorageID(getDatabaseName(), table_name),
ColumnsDescription{},
String{},
source_database_name_or_regexp,
database_is_regexp,
source_table_regexp,
table_to_write,
false,
context);

res->startup();
Expand Down
44 changes: 44 additions & 0 deletions tests/queries/0_stateless/03373_write_to_merge_table.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
2 1
2 2
2 3
1 1
2 1
2 2
2 3
1 1
2 1
2 2
2 3
1 1
2 1
2 2
2 3
1 1
2 1
2 2
2 3
1 1
2 1
2 2
2 3
4
2 1
2 2
2 3
3 1
4
2 1
2 2
2 3
3 1
1
3 2
4
2 1
2 2
2 3
3 1
0
2
3 2
3 3
Loading
Loading