-
Notifications
You must be signed in to change notification settings - Fork 6
Restart cluster tasks on connection lost #780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: antalya-25.3
Are you sure you want to change the base?
Conversation
69cce89
to
fe4eee1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round
@@ -179,4 +199,28 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess | |||
return std::nullopt; | |||
} | |||
|
|||
void StorageObjectStorageStableTaskDistributor::rerunTasksForReplica(size_t number_of_current_replica) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this method re-schedules the tasks FROM a given replica to another one. Is that correct?
If that's so, I would name it differently:
rescheduleTasksFromReplica
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it adds tasks from replica to unprocessed list to allow other replicas take them.
if (processed_file_list_ptr == processed_files.end()) | ||
throw Exception( | ||
ErrorCodes::LOGICAL_ERROR, | ||
"Replica number {} was marked as lost, can't set satk for it anymore", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
@@ -34,6 +39,7 @@ class StorageObjectStorageStableTaskDistributor | |||
std::unordered_set<String> unprocessed_files; | |||
|
|||
std::vector<std::string> ids_of_nodes; | |||
std::unordered_map<size_t, std::list<String>> processed_files; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replica_to_files_to_be_processed_map or something like this.
imo
- It needs to mention it is a mapping from replica to list of files
- it can't be "processed" as it is in the past tense because the files haven't been processed yet as far as I understand
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); | ||
read_context.has_read_packet_part = PacketPart::Body; | ||
} | ||
catch (const Exception &) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you catch only network related exceptions? Or maybe the question is: is there an non-connection-loss related exceptionthat could be thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only network exceptions - may be.
Others - not sure, It replica sent an exception because data corrupted (it may be unpredictable error), other replica with the same data gets the same error. May be possible to reschedule task with some specific exceptions, but need to know this specific cases. I don't know right now. If we found something, we can add this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions that come to mind right now:
- Does a replica ever gets reconnected after being marked as lost?
- As far as I understand, once a replica is assigned a file, it'll process that file and return the result before picking the next file. Therefore,
StorageObjectStorageStableTaskDistributor::processed_files
should be updated, shouldn't it?
@@ -8,6 +8,8 @@ | |||
#include <Interpreters/StorageID.h> | |||
#include <sys/types.h> | |||
|
|||
#include <list> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this header here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forget to remove, thanks
|
Can be rebased on antalya-25.3 branch only after merge of #797, feature depends on rendezvous hashing (uses same class). |
d381de9
to
c3afad3
Compare
Fixed after review and rebased |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Restart loading objects on other nodes when one node down in cluster request.
Documentation entry for user-facing changes
When swarm node takes some tasks to execute and goes down, if no data from this node were processed, task can be returned back to common queue to be executed on other node, if any still alive.
Did not managed to make test, tested with adding sleep in code to be able to kill some replica in that time slot.
Exclude tests: