Skip to content

Commit f2ae2cc

Browse files
committed
lock part inside task
1 parent 0fdc716 commit f2ae2cc

File tree

7 files changed

+138
-32
lines changed

7 files changed

+138
-32
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include <Storages/MergeTree/ExportPartFromPartitionExportTask.h>
2+
#include <Common/ProfileEvents.h>
3+
4+
namespace ProfileEvents
5+
{
6+
extern const Event ExportPartitionZooKeeperRequests;
7+
extern const Event ExportPartitionZooKeeperGetChildren;
8+
extern const Event ExportPartitionZooKeeperCreate;
9+
}
10+
namespace DB
11+
{
12+
13+
ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask(
14+
StorageReplicatedMergeTree & storage_,
15+
const std::string & key_,
16+
const MergeTreePartExportManifest & manifest_,
17+
ContextPtr context_)
18+
: storage(storage_),
19+
key(key_),
20+
manifest(manifest_),
21+
local_context(context_)
22+
{
23+
export_part_task = std::make_shared<ExportPartTask>(storage, manifest, local_context);
24+
}
25+
26+
bool ExportPartFromPartitionExportTask::executeStep()
27+
{
28+
const auto zk = storage.getZooKeeper();
29+
const auto part_name = manifest.data_part->name;
30+
31+
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
32+
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
33+
if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
34+
{
35+
export_part_task->executeStep();
36+
return false;
37+
}
38+
39+
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", part_name);
40+
return false;
41+
}
42+
43+
void ExportPartFromPartitionExportTask::cancel() noexcept
44+
{
45+
export_part_task->cancel();
46+
}
47+
48+
void ExportPartFromPartitionExportTask::onCompleted()
49+
{
50+
export_part_task->onCompleted();
51+
}
52+
53+
StorageID ExportPartFromPartitionExportTask::getStorageID() const
54+
{
55+
return export_part_task->getStorageID();
56+
}
57+
58+
Priority ExportPartFromPartitionExportTask::getPriority() const
59+
{
60+
return export_part_task->getPriority();
61+
}
62+
63+
String ExportPartFromPartitionExportTask::getQueryId() const
64+
{
65+
return export_part_task->getQueryId();
66+
}
67+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
3+
#include <Storages/MergeTree/IExecutableTask.h>
4+
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
5+
#include <Storages/StorageReplicatedMergeTree.h>
6+
#include <Storages/MergeTree/ExportPartTask.h>
7+
8+
namespace DB
9+
{
10+
11+
class ExportPartFromPartitionExportTask : public IExecutableTask
12+
{
13+
public:
14+
explicit ExportPartFromPartitionExportTask(
15+
StorageReplicatedMergeTree & storage_,
16+
const std::string & key_,
17+
const MergeTreePartExportManifest & manifest_,
18+
ContextPtr context_);
19+
bool executeStep() override;
20+
void onCompleted() override;
21+
StorageID getStorageID() const override;
22+
Priority getPriority() const override;
23+
String getQueryId() const override;
24+
25+
void cancel() noexcept override;
26+
27+
private:
28+
StorageReplicatedMergeTree & storage;
29+
std::string key;
30+
MergeTreePartExportManifest manifest;
31+
ContextPtr local_context;
32+
std::shared_ptr<ExportPartTask> export_part_task;
33+
};
34+
35+
}

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
101101
{
102102
}
103103

104+
const MergeTreePartExportManifest & ExportPartTask::getManifest() const
105+
{
106+
return manifest;
107+
}
108+
104109
bool ExportPartTask::executeStep()
105110
{
106111
auto local_context = Context::createCopy(storage.getContext());

src/Storages/MergeTree/ExportPartTask.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class ExportPartTask : public IExecutableTask
1818
StorageID getStorageID() const override;
1919
Priority getPriority() const override;
2020
String getQueryId() const override;
21+
const MergeTreePartExportManifest & getManifest() const;
2122

2223
void cancel() noexcept override;
2324

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#include <Common/ProfileEvents.h>
88
#include "Storages/MergeTree/ExportPartitionUtils.h"
99
#include "Storages/MergeTree/MergeTreePartExportManifest.h"
10+
#include "Storages/MergeTree/ExportPartFromPartitionExportTask.h"
11+
#include "Formats/FormatFactory.h"
12+
#include <Core/Settings.h>
1013

1114
namespace ProfileEvents
1215
{
@@ -24,6 +27,11 @@ namespace ProfileEvents
2427
namespace DB
2528
{
2629

30+
namespace Setting
31+
{
32+
extern const SettingsMergeTreePartExportFileAlreadyExistsPolicy export_merge_tree_part_file_already_exists_policy;
33+
}
34+
2735
namespace ErrorCodes
2836
{
2937
extern const int QUERY_WAS_CANCELLED;
@@ -156,41 +164,29 @@ void ExportPartitionTaskScheduler::run()
156164
continue;
157165
}
158166

159-
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
160-
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate);
161-
if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral))
162-
{
163-
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name);
164-
continue;
165-
}
167+
std::lock_guard part_export_lock(storage.export_manifests_mutex);
166168

167-
try
168-
{
169-
storage.exportPartToTable(
170-
part->name,
171-
destination_storage_id,
172-
manifest.transaction_id,
173-
getContextCopyWithTaskSettings(storage.getContext(), manifest),
174-
/*allow_outdated_parts*/ true,
175-
[this, key, zk_part_name, manifest, destination_storage]
176-
(MergeTreePartExportManifest::CompletionCallbackResult result)
177-
{
178-
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
179-
});
180-
}
181-
catch (const Exception &)
182-
{
183-
tryLogCurrentException(__PRETTY_FUNCTION__);
184-
zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name);
185-
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
186-
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove);
187-
/// we should not increment retry_count because the node might just be full
188-
}
169+
auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest);
170+
171+
const auto format_settings = getFormatSettings(context);
172+
173+
MergeTreePartExportManifest part_export_manifest(
174+
destination_storage->getStorageID(),
175+
part,
176+
manifest.transaction_id,
177+
context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
178+
format_settings,
179+
storage.getInMemoryMetadataPtr(),
180+
[this, key, zk_part_name, manifest, destination_storage]
181+
(MergeTreePartExportManifest::CompletionCallbackResult result)
182+
{
183+
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
184+
});
185+
186+
storage.background_moves_assignee.scheduleMoveTask(
187+
std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest, getContextCopyWithTaskSettings(storage.getContext(), manifest)));
189188
}
190189
}
191-
192-
/// maybe we failed to schedule or failed to export, need to retry eventually
193-
storage.export_merge_tree_partition_select_task->scheduleAfter(1000 * 5);
194190
}
195191

196192
void ExportPartitionTaskScheduler::handlePartExportCompletion(

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
14041404
friend class IMergedBlockOutputStream; // for access to log
14051405
friend struct DataPartsLock; // for access to shared_parts_list/shared_ranges_in_parts
14061406
friend class ExportPartTask;
1407+
friend class ExportPartFromPartitionExportTask;
14071408

14081409
bool require_part_metadata;
14091410

src/Storages/StorageReplicatedMergeTree.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
406406
friend class ReplicatedMergeMutateTaskBase;
407407
friend class ExportPartitionManifestUpdatingTask;
408408
friend class ExportPartitionTaskScheduler;
409+
friend class ExportPartFromPartitionExportTask;
409410

410411
using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker;
411412
using LogEntry = ReplicatedMergeTreeLogEntry;

0 commit comments

Comments
 (0)