Skip to content

lock_object_storage_task_distribution_ms setting #866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: antalya-25.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
)", EXPERIMENTAL) \
\

Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster", "", "", "New setting"},
{"object_storage_max_nodes", 0, 0, "New setting"},
{"use_object_storage_list_objects_cache", true, false, "New setting."},
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.3",
{
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace Setting
{
extern const SettingsBool use_hive_partitioning;
extern const SettingsString object_storage_cluster;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
}

namespace ErrorCodes
Expand Down Expand Up @@ -386,7 +387,10 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
}
}

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
iterator,
ids_of_hosts,
local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
Expand Down
19 changes: 18 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Storages/VirtualColumnUtils.h>
Expand Down Expand Up @@ -430,16 +431,32 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
ObjectInfoPtr object_info;
auto query_settings = configuration->getQuerySettings(context_);

bool not_a_path = false;

do
{
not_a_path = false;
object_info = file_iterator->next(processor);

if (!object_info || object_info->getPath().empty())
return {};

StorageObjectStorageStableTaskDistributor::CommandInTaskResponse command(object_info->getPath());
if (command.is_parsed())
{
auto retry_after_us = command.get_retry_after_us();
if (retry_after_us.has_value())
{
not_a_path = true;
/// TODO: Make asyncronous waiting without sleep in thread
sleepForMicroseconds(std::min(100000ul, retry_after_us.value()));
continue;
}
}

object_info->loadMetadata(object_storage);
}
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));

QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
#include <consistent_hashing.h>
#include <optional>

#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSONException.h>

namespace DB
{

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_)
std::vector<std::string> ids_of_nodes_,
uint64_t lock_object_storage_task_distribution_ms_)
: iterator(std::move(iterator_))
, connection_to_files(ids_of_nodes_.size())
, ids_of_nodes(ids_of_nodes_)
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
, iterator_exhausted(false)
{
}
Expand All @@ -24,6 +30,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
number_of_current_replica
);

saveLastNodeActivity(number_of_current_replica);

// 1. Check pre-queued files first
if (auto file = getPreQueuedFile(number_of_current_replica))
return file;
Expand Down Expand Up @@ -148,7 +156,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
// Queue file for its assigned replica
{
std::lock_guard lock(mutex);
unprocessed_files.insert(file_path);
unprocessed_files[file_path] = number_of_current_replica;
connection_to_files[file_replica_idx].push_back(file_path);
}
}
Expand All @@ -158,25 +166,96 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile

std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
{
/// Limit time of node activity to keep task in queue
Poco::Timestamp activity_limit;
Poco::Timestamp oldest_activity;
if (lock_object_storage_task_distribution_us)
activity_limit -= lock_object_storage_task_distribution_us;

std::lock_guard lock(mutex);

if (!unprocessed_files.empty())
{
auto it = unprocessed_files.begin();
String next_file = *it;
unprocessed_files.erase(it);

while (it != unprocessed_files.end())
{
auto last_activity = last_node_activity.find(it->second);
if (!lock_object_storage_task_distribution_us
|| last_activity == last_node_activity.end()
|| activity_limit > last_activity->second)
{
String next_file = it->first;
unprocessed_files.erase(it);

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
);

return next_file;
}

oldest_activity = std::min(oldest_activity, last_activity->second);
++it;
}

LOG_TRACE(
log,
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
next_file,
number_of_current_replica
"No unprocessed file for replica {}, need to retry after {} us",
number_of_current_replica,
oldest_activity - activity_limit
);

return next_file;
/// All unprocessed files owned by alive replicas with recenlty activity
/// Need to retry after (oldest_activity - activity_limit) microseconds
CommandInTaskResponse response;
response.set_retry_after_us(oldest_activity - activity_limit);
return response.to_string();
}

return std::nullopt;
}

void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
{
Poco::Timestamp now;
std::lock_guard lock(mutex);
last_node_activity[number_of_current_replica] = now;
}

StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
{
Poco::JSON::Parser parser;
try
{
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
if (!json)
return;

successfully_parsed = true;

if (json->has("retry_after_us"))
retry_after_us = json->getValue<size_t>("retry_after_us");
}
catch (const Poco::JSON::JSONException &)
{ /// Not a JSON
return;
}
}

std::string StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::to_string() const
{
Poco::JSON::Object json;
if (retry_after_us.has_value())
json.set("retry_after_us", retry_after_us.value());

std::ostringstream oss;
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
#include <Interpreters/Cluster.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>

#include <Poco/Timestamp.h>

#include <unordered_set>
#include <unordered_map>
#include <vector>
#include <mutex>
#include <memory>
Expand All @@ -16,9 +20,28 @@ namespace DB
class StorageObjectStorageStableTaskDistributor
{
public:
class CommandInTaskResponse
{
public:
CommandInTaskResponse() {}
CommandInTaskResponse(const std::string & task);

bool is_parsed() const { return successfully_parsed; }
void set_retry_after_us(uint64_t time_us) { retry_after_us = time_us; }

std::string to_string() const;

std::optional<uint64_t> get_retry_after_us() const { return retry_after_us; }

private:
bool successfully_parsed = false;
std::optional<uint64_t> retry_after_us;
};

StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
std::vector<std::string> ids_of_nodes_);
std::vector<std::string> ids_of_nodes_,
uint64_t lock_object_storage_task_distribution_ms_);

std::optional<String> getNextTask(size_t number_of_current_replica);

Expand All @@ -28,12 +51,17 @@ class StorageObjectStorageStableTaskDistributor
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);

void saveLastNodeActivity(size_t number_of_current_replica);

std::shared_ptr<IObjectIterator> iterator;

std::vector<std::vector<String>> connection_to_files;
std::unordered_set<String> unprocessed_files;
/// Map of unprocessed files in format filename => number of prefetched replica
std::unordered_map<String, size_t> unprocessed_files;

std::vector<std::string> ids_of_nodes;
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;

std::mutex mutex;
bool iterator_exhausted = false;
Expand Down
Loading
Loading