Skip to content

Restart cluster tasks on connection lost #780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: antalya-25.3
Choose a base branch
from

Conversation

ianton-ru
Copy link

@ianton-ru ianton-ru commented May 14, 2025

Changelog category (leave one):

  • Experimental Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Restart loading objects on other nodes when one node down in cluster request.

Documentation entry for user-facing changes

When swarm node takes some tasks to execute and goes down, if no data from this node were processed, task can be returned back to common queue to be executed on other node, if any still alive.

Did not managed to make test, tested with adding sleep in code to be able to kill some replica in that time slot.

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

@ianton-ru ianton-ru force-pushed the feature/retries_in_cluster_functions branch 2 times, most recently from 69cce89 to fe4eee1 Compare May 19, 2025 13:23
@ianton-ru ianton-ru marked this pull request as ready for review May 21, 2025 18:01
Copy link
Collaborator

@arthurpassos arthurpassos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round

@@ -179,4 +199,28 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
return std::nullopt;
}

void StorageObjectStorageStableTaskDistributor::rerunTasksForReplica(size_t number_of_current_replica)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this method re-schedules the tasks FROM a given replica to another one. Is that correct?

If that's so, I would name it differently:

rescheduleTasksFromReplica

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it adds tasks from replica to unprocessed list to allow other replicas take them.

if (processed_file_list_ptr == processed_files.end())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} was marked as lost, can't set satk for it anymore",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

@@ -34,6 +39,7 @@ class StorageObjectStorageStableTaskDistributor
std::unordered_set<String> unprocessed_files;

std::vector<std::string> ids_of_nodes;
std::unordered_map<size_t, std::list<String>> processed_files;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replica_to_files_to_be_processed_map or something like this.

imo

  1. It needs to mention it is a mapping from replica to list of files
  2. it can't be "processed" as it is in the past tense because the files haven't been processed yet as far as I understand

read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
}
catch (const Exception &)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you catch only network related exceptions? Or maybe the question is: is there an non-connection-loss related exceptionthat could be thrown?

Copy link
Author

@ianton-ru ianton-ru May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only network exceptions - may be.
Others - not sure, It replica sent an exception because data corrupted (it may be unpredictable error), other replica with the same data gets the same error. May be possible to reschedule task with some specific exceptions, but need to know this specific cases. I don't know right now. If we found something, we can add this later.

Copy link
Collaborator

@arthurpassos arthurpassos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions that come to mind right now:

  1. Does a replica ever gets reconnected after being marked as lost?
  2. As far as I understand, once a replica is assigned a file, it'll process that file and return the result before picking the next file. Therefore, StorageObjectStorageStableTaskDistributor::processed_files should be updated, shouldn't it?

@@ -8,6 +8,8 @@
#include <Interpreters/StorageID.h>
#include <sys/types.h>

#include <list>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this header here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget to remove, thanks

@ianton-ru
Copy link
Author

  1. Does a replica ever gets reconnected after being marked as lost?
    Not in this query. The problem exist only when replica already took some files to process it and goes down before processed all data. We can reschedule task only when initiator did not get any parts of data, because this parts are merged into response, and now impossible to "unmerge" them back to process all file, and we have no information to process only unprocessed part of file on other node.
  2. As far as I understand, once a replica is assigned a file, it'll process that file and return the result before picking the next file. Therefore, StorageObjectStorageStableTaskDistributor::processed_files should be updated, shouldn't it?
    Each replica takes several files (4 by default) to process in parallel. And in partial responses no information from which file is this response. So - impossible to separate data of completed files from data of non-completed, and when initiator gets something impossible to reschedule tasks.

@ianton-ru
Copy link
Author

Can be rebased on antalya-25.3 branch only after merge of #797, feature depends on rendezvous hashing (uses same class).

@svb-alt svb-alt linked an issue May 29, 2025 that may be closed by this pull request
@ianton-ru ianton-ru force-pushed the feature/retries_in_cluster_functions branch from d381de9 to c3afad3 Compare June 5, 2025 09:44
@ianton-ru ianton-ru changed the base branch from antalya to antalya-25.3 June 5, 2025 09:44
@ianton-ru
Copy link
Author

Fixed after review and rebased

@Enmk Enmk added antalya-25.3.3 swarms Antalya Roadmap: Swarms labels Jun 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Retries in cluster requests
4 participants