Skip to content

Commit 1e38200

Browse files
committed
impl
1 parent 920adbf commit 1e38200

14 files changed

+130
-95
lines changed

src/Interpreters/InterpreterAlterQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
482482
case ASTAlterCommand::EXPORT_PART:
483483
{
484484
required_access.emplace_back(AccessType::ALTER_MOVE_PARTITION, database, table);
485+
required_access.emplace_back(AccessType::INSERT, command.to_database, command.to_table);
485486
break;
486487
}
487488
case ASTAlterCommand::REPLACE_PARTITION:

src/Parsers/ASTAlterQuery.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,23 @@ void ASTAlterCommand::formatImpl(WriteBuffer & ostr, const FormatSettings & sett
348348
ostr << (settings.hilite ? hilite_keyword : "") << "EXPORT " << (part ? "PART " : "PARTITION ")
349349
<< (settings.hilite ? hilite_none : "");
350350
partition->formatImpl(ostr, settings, state, frame);
351+
ostr << " TO ";
352+
switch (move_destination_type)
353+
{
354+
case DataDestinationType::TABLE:
355+
ostr << "TABLE ";
356+
if (!to_database.empty())
357+
{
358+
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_database)
359+
<< (settings.hilite ? hilite_none : "") << ".";
360+
}
361+
ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(to_table)
362+
<< (settings.hilite ? hilite_none : "");
363+
return;
364+
default:
365+
break;
366+
}
367+
351368
}
352369
else if (type == ASTAlterCommand::REPLACE_PARTITION)
353370
{

src/Parsers/CommonParsers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ namespace DB
325325
MR_MACROS(MONTHS, "MONTHS") \
326326
MR_MACROS(MOVE_PART, "MOVE PART") \
327327
MR_MACROS(MOVE_PARTITION, "MOVE PARTITION") \
328-
MR_MACROS(EXPORT_PART, "EXPORT PART") \
328+
MR_MACROS(EXPORT_PARTITION, "EXPORT PARTITION") \
329329
MR_MACROS(MOVE, "MOVE") \
330330
MR_MACROS(MS, "MS") \
331331
MR_MACROS(MUTATION, "MUTATION") \

src/Parsers/ParserAlterQuery.cpp

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
8282
ParserKeyword s_forget_partition(Keyword::FORGET_PARTITION);
8383
ParserKeyword s_move_partition(Keyword::MOVE_PARTITION);
8484
ParserKeyword s_move_part(Keyword::MOVE_PART);
85-
ParserKeyword s_export_part(Keyword::EXPORT_PART);
85+
ParserKeyword s_export_part(Keyword::EXPORT_PARTITION);
8686
ParserKeyword s_drop_detached_partition(Keyword::DROP_DETACHED_PARTITION);
8787
ParserKeyword s_drop_detached_part(Keyword::DROP_DETACHED_PART);
8888
ParserKeyword s_fetch_partition(Keyword::FETCH_PARTITION);
@@ -557,28 +557,20 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
557557
}
558558
else if (s_export_part.ignore(pos, expected))
559559
{
560-
if (!parser_string_and_substituion.parse(pos, command_partition, expected))
560+
if (!parser_partition.parse(pos, command_partition, expected))
561561
return false;
562562

563563
command->type = ASTAlterCommand::EXPORT_PART;
564-
command->part = true;
564+
// command->part = true;
565+
566+
if (!s_to_table.ignore(pos, expected))
567+
{
568+
return false;
569+
}
565570

566-
// if (s_to_disk.ignore(pos, expected))
567-
// command->move_destination_type = DataDestinationType::DISK;
568-
// else if (s_to_volume.ignore(pos, expected))
569-
// command->move_destination_type = DataDestinationType::VOLUME;
570-
// else if (s_to_shard.ignore(pos, expected))
571-
// {
572-
// command->move_destination_type = DataDestinationType::SHARD;
573-
// }
574-
// else
575-
// return false;
576-
//
577-
// ASTPtr ast_space_name;
578-
// if (!parser_string_literal.parse(pos, ast_space_name, expected))
579-
// return false;
580-
//
581-
// command->move_destination_name = ast_space_name->as<ASTLiteral &>().value.safeGet<const String &>();
571+
if (!parseDatabaseAndTableName(pos, expected, command->to_database, command->to_table))
572+
return false;
573+
command->move_destination_type = DataDestinationType::TABLE;
582574
}
583575
else if (s_add_constraint.ignore(pos, expected))
584576
{

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,10 @@
99
#include <Backups/BackupEntryWrappedWith.h>
1010
#include <Backups/IBackup.h>
1111
#include <Backups/RestorerFromBackup.h>
12-
#include <Common/Config/ConfigHelper.h>
13-
#include <Common/CurrentMetrics.h>
14-
#include <Common/Increment.h>
15-
#include <Common/ProfileEventsScope.h>
16-
#include <Common/SimpleIncrement.h>
17-
#include <Common/Stopwatch.h>
18-
#include <Common/StringUtils.h>
19-
#include <Common/ThreadFuzzer.h>
20-
#include <Common/escapeForFileName.h>
21-
#include <Common/noexcept_scope.h>
22-
#include <Common/quoteString.h>
23-
#include <Common/scope_guard_safe.h>
24-
#include <Common/typeid_cast.h>
25-
#include <Core/Settings.h>
26-
#include <Core/ServerSettings.h>
27-
#include <Storages/MergeTree/RangesInDataPart.h>
2812
#include <Compression/CompressedReadBuffer.h>
2913
#include <Core/QueryProcessingStage.h>
14+
#include <Core/ServerSettings.h>
15+
#include <Core/Settings.h>
3016
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
3117
#include <DataTypes/DataTypeEnum.h>
3218
#include <DataTypes/DataTypeLowCardinality.h>
@@ -45,47 +31,62 @@
4531
#include <IO/WriteHelpers.h>
4632
#include <Interpreters/Aggregator.h>
4733
#include <Interpreters/Context.h>
48-
#include <Interpreters/convertFieldToType.h>
49-
#include <Interpreters/evaluateConstantExpression.h>
5034
#include <Interpreters/ExpressionAnalyzer.h>
5135
#include <Interpreters/InterpreterSelectQuery.h>
5236
#include <Interpreters/MergeTreeTransaction.h>
5337
#include <Interpreters/PartLog.h>
5438
#include <Interpreters/TransactionLog.h>
5539
#include <Interpreters/TreeRewriter.h>
40+
#include <Interpreters/convertFieldToType.h>
41+
#include <Interpreters/evaluateConstantExpression.h>
5642
#include <Interpreters/inplaceBlockConversions.h>
43+
#include <Parsers/ASTAlterQuery.h>
5744
#include <Parsers/ASTExpressionList.h>
58-
#include <Parsers/ASTIndexDeclaration.h>
59-
#include <Parsers/ASTHelpers.h>
6045
#include <Parsers/ASTFunction.h>
46+
#include <Parsers/ASTHelpers.h>
47+
#include <Parsers/ASTIndexDeclaration.h>
6148
#include <Parsers/ASTLiteral.h>
6249
#include <Parsers/ASTNameTypePair.h>
6350
#include <Parsers/ASTPartition.h>
6451
#include <Parsers/ASTSetQuery.h>
6552
#include <Parsers/ASTTablesInSelectQuery.h>
6653
#include <Parsers/parseQuery.h>
6754
#include <Parsers/queryToString.h>
68-
#include <Parsers/ASTAlterQuery.h>
6955
#include <Processors/Formats/IInputFormat.h>
7056
#include <Processors/QueryPlan/QueryIdHolder.h>
7157
#include <Processors/QueryPlan/ReadFromMergeTree.h>
7258
#include <Storages/AlterCommands.h>
73-
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
7459
#include <Storages/Freeze.h>
60+
#include <Storages/MergeTree/ActiveDataPartSet.h>
7561
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
7662
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
7763
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
78-
#include <Storages/MergeTree/MergeTreeSettings.h>
79-
#include <Storages/Statistics/ConditionSelectivityEstimator.h>
64+
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
8065
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
66+
#include <Storages/MergeTree/MergeTreeSettings.h>
67+
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
68+
#include <Storages/MergeTree/RangesInDataPart.h>
8169
#include <Storages/MergeTree/checkDataPart.h>
8270
#include <Storages/MutationCommands.h>
83-
#include <Storages/MergeTree/ActiveDataPartSet.h>
71+
#include <Storages/ObjectStorage/StorageObjectStorage.h>
72+
#include <Storages/Statistics/ConditionSelectivityEstimator.h>
73+
#include <Storages/StorageFile.h>
8474
#include <Storages/StorageMergeTree.h>
8575
#include <Storages/StorageReplicatedMergeTree.h>
8676
#include <Storages/VirtualColumnUtils.h>
87-
#include <Storages/MergeTree/MergeTreeIndexGranularityAdaptive.h>
88-
#include <Storages/MergeTree/exportMTPartToParquet.h>
77+
#include <Common/Config/ConfigHelper.h>
78+
#include <Common/CurrentMetrics.h>
79+
#include <Common/Increment.h>
80+
#include <Common/ProfileEventsScope.h>
81+
#include <Common/SimpleIncrement.h>
82+
#include <Common/Stopwatch.h>
83+
#include <Common/StringUtils.h>
84+
#include <Common/ThreadFuzzer.h>
85+
#include <Common/escapeForFileName.h>
86+
#include <Common/noexcept_scope.h>
87+
#include <Common/quoteString.h>
88+
#include <Common/scope_guard_safe.h>
89+
#include <Common/typeid_cast.h>
8990

9091
#include <boost/range/algorithm_ext/erase.hpp>
9192
#include <boost/algorithm/string/join.hpp>
@@ -5597,16 +5598,7 @@ Pipe MergeTreeData::alterPartition(
55975598

55985599
case PartitionCommand::EXPORT_PART:
55995600
{
5600-
if (command.part)
5601-
{
5602-
auto part_name = command.partition->as<ASTLiteral &>().value.safeGet<String>();
5603-
auto data_part = getPartIfExists(part_name, {DataPartStates::value_type::Active});
5604-
5605-
if (!data_part)
5606-
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in committed state", part_name);
5607-
5608-
exportMTPartToParquet(*this, data_part, query_context);
5609-
}
5601+
exportPartitionToTable(command, query_context);
56105602
break;
56115603
}
56125604

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,8 @@ class MergeTreeData : public IStorage, public WithMutableContext
856856
/// Moves partition to specified Table
857857
void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context);
858858

859+
virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "export not implemented");}
860+
859861
/// Checks that Partition could be dropped right now
860862
/// Otherwise - throws an exception with detailed information.
861863
/// We do not use mutex because it is not very important that the size could change during the operation.

src/Storages/MergeTree/exportMTPartToParquet.h

Lines changed: 0 additions & 10 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,30 @@
1-
#include <Storages/MergeTree/exportMTPartToParquet.h>
2-
#include <Processors/QueryPlan/QueryPlan.h>
3-
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
4-
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
1+
#include <Processors/Executors/CompletedPipelineExecutor.h>
2+
#include <Processors/Executors/PullingPipelineExecutor.h>
53
#include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
4+
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
5+
#include <Processors/QueryPlan/QueryPlan.h>
6+
#include <Processors/Sinks/EmptySink.h>
67
#include <QueryPipeline/QueryPipelineBuilder.h>
7-
#include <Processors/Executors/PullingPipelineExecutor.h>
8-
#include <Processors/Executors/PushingPipelineExecutor.h>
9-
#include <Formats/FormatFactory.h>
8+
#include <Storages/MergeTree/MergeTreeSequentialSource.h>
9+
#include <Storages/MergeTree/exportMTPartToStorage.h>
1010

1111

1212
namespace DB
1313
{
1414

15-
void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, ContextPtr context)
15+
void exportMTPartToStorage(const MergeTreeData & source_data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context)
1616
{
17-
auto metadata_snapshot = data.getInMemoryMetadataPtr();
17+
auto metadata_snapshot = source_data.getInMemoryMetadataPtr();
1818
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
19-
StorageSnapshotPtr storage_snapshot = data.getStorageSnapshot(metadata_snapshot, context);
19+
StorageSnapshotPtr storage_snapshot = source_data.getStorageSnapshot(metadata_snapshot, context);
2020

2121
MergeTreeData::IMutationsSnapshot::Params params
2222
{
2323
.metadata_version = metadata_snapshot->getMetadataVersion(),
2424
.min_part_metadata_version = data_part->getMetadataVersion(),
2525
};
2626

27-
auto mutations_snapshot = data.getMutationsSnapshot(params);
27+
auto mutations_snapshot = source_data.getMutationsSnapshot(params);
2828

2929
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
3030
data_part,
@@ -44,7 +44,7 @@ void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::Data
4444
createReadFromPartStep(
4545
read_type,
4646
plan,
47-
data,
47+
source_data,
4848
storage_snapshot,
4949
data_part,
5050
alter_conversions,
@@ -62,23 +62,11 @@ void exportMTPartToParquet(const MergeTreeData & data, const MergeTreeData::Data
6262
auto builder = plan.buildQueryPipeline(optimization_settings, pipeline_settings);
6363

6464
QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
65-
auto header_block = pipeline.getHeader();
66-
67-
auto out_file_name = data_part->name + ".parquet";
68-
69-
auto out_file = std::make_shared<WriteBufferFromFile>(out_file_name);
70-
auto parquet_output = FormatFactory::instance().getOutputFormat("Parquet", *out_file, header_block, context);
71-
PullingPipelineExecutor executor(pipeline);
72-
73-
Block block;
74-
while (executor.pull(block))
75-
{
76-
parquet_output->write(block);
77-
}
7865

79-
parquet_output->finalize();
66+
pipeline.complete(std::move(dst_storage_sink));
8067

81-
out_file->finalize();
68+
CompletedPipelineExecutor executor(pipeline);
69+
executor.execute();
8270
}
8371

8472
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#pragma once
2+
3+
#include <Storages/MergeTree/MergeTreeData.h>
4+
5+
namespace DB
6+
{
7+
8+
void exportMTPartToStorage(const MergeTreeData & data, const MergeTreeData::DataPartPtr & data_part, SinkToStoragePtr dst_storage_sink, ContextPtr context);
9+
10+
}

src/Storages/ObjectStorage/S3/Configuration.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ StorageObjectStorage::QuerySettings StorageS3Configuration::getQuerySettings(con
117117
const auto & settings = context->getSettingsRef();
118118
return StorageObjectStorage::QuerySettings{
119119
.truncate_on_insert = settings[Setting::s3_truncate_on_insert],
120-
.create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert],
120+
. create_new_file_on_insert = settings[Setting::s3_create_new_file_on_insert],
121121
.schema_inference_use_cache = settings[Setting::schema_inference_use_cache_for_s3],
122122
.schema_inference_mode = settings[Setting::schema_inference_mode],
123123
.skip_empty_files = settings[Setting::s3_skip_empty_files],

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ SinkToStoragePtr StorageObjectStorage::write(
366366
configuration->getPath());
367367
}
368368

369+
// todo arthur continue from here
369370
if (configuration->withGlobsIgnorePartitionWildcard())
370371
{
371372
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED,

src/Storages/PartitionCommands.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
5757
res.type = EXPORT_PART;
5858
res.partition = command_ast->partition->clone();
5959
res.part = command_ast->part;
60+
res.move_destination_type = PartitionCommand::MoveDestinationType::TABLE;
61+
res.to_database = command_ast->to_database;
62+
res.to_table = command_ast->to_table;
6063
return res;
6164
}
6265
if (command_ast->type == ASTAlterCommand::MOVE_PARTITION)

src/Storages/StorageMergeTree.cpp

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
#include <Common/ProfileEventsScope.h>
4040
#include <Common/escapeForFileName.h>
4141
#include <IO/SharedThreadPools.h>
42+
#include <Parsers/ASTInsertQuery.h>
43+
#include <Storages/MergeTree/exportMTPartToStorage.h>
4244

4345

4446
namespace DB
@@ -2447,6 +2449,42 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
24472449
}
24482450
}
24492451

2452+
/*
2453+
* For now, this function is meant to be used when exporting to different formats (i.e, the case where data needs to be re-encoded / serialized)
2454+
* For the cases where this is not necessary, there are way more optimal ways of doing that, such as hard links implemented by `movePartitionToTable`
2455+
* */
2456+
void StorageMergeTree::exportPartitionToTable(const PartitionCommand & command, ContextPtr query_context)
2457+
{
2458+
String dest_database = query_context->resolveDatabase(command.to_database);
2459+
auto dest_storage = DatabaseCatalog::instance().getTable({dest_database, command.to_table}, query_context);
2460+
2461+
/// The target table and the source table are the same.
2462+
if (dest_storage->getStorageID() == this->getStorageID())
2463+
return;
2464+
2465+
bool async_insert = areAsynchronousInsertsEnabled();
2466+
2467+
auto query = std::make_shared<ASTInsertQuery>();
2468+
2469+
String partition_id = getPartitionIDFromQuery(command.partition, getContext());
2470+
auto src_parts = getVisibleDataPartsVectorInPartition(getContext(), partition_id);
2471+
2472+
if (src_parts.empty())
2473+
{
2474+
return;
2475+
}
2476+
2477+
auto lock1 = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]);
2478+
auto lock2 = dest_storage->lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef()[Setting::lock_acquire_timeout]);
2479+
auto merges_blocker = stopMergesAndWait();
2480+
2481+
for (const auto & data_part : src_parts)
2482+
{
2483+
auto sink = dest_storage->write(query, getInMemoryMetadataPtr(), getContext(), async_insert);
2484+
exportMTPartToStorage(*this, data_part, sink, query_context);
2485+
}
2486+
}
2487+
24502488
ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
24512489
{
24522490
if (action_type == ActionLocks::PartsMerge)

0 commit comments

Comments
 (0)