Skip to content

Commit

Permalink
Treat completing a client reset as receiving a MARK message
Browse files Browse the repository at this point in the history
  • Loading branch information
tgoyne committed Jul 31, 2024
1 parent 819bb98 commit 4cc2831
Show file tree
Hide file tree
Showing 23 changed files with 449 additions and 351 deletions.
1 change: 1 addition & 0 deletions src/realm/sync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ set(NOINST_HEADERS
noinst/integer_codec.hpp
noinst/migration_store.hpp
noinst/pending_bootstrap_store.hpp
noinst/pending_reset_store.hpp
noinst/protocol_codec.hpp
noinst/root_certs.hpp
noinst/sync_metadata_schema.hpp
Expand Down
59 changes: 5 additions & 54 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
// Can be called from any thread.
util::Future<std::string> send_test_command(std::string body);

void handle_pending_client_reset_acknowledgement();

// Can be called from any thread.
std::string get_appservices_connection_id();

Expand Down Expand Up @@ -779,14 +777,6 @@ void SessionImpl::on_resumed()
}
}

void SessionImpl::handle_pending_client_reset_acknowledgement()
{
// Ignore the call if the session is not active
if (m_state == State::Active) {
m_wrapper.handle_pending_client_reset_acknowledgement();
}
}

bool SessionImpl::process_flx_bootstrap_message(const DownloadMessage& message)
{
// Ignore the message if the session is not active or a steady state message
Expand Down Expand Up @@ -1354,8 +1344,12 @@ void SessionWrapper::actualize()
}
}

if (!m_client_reset_config)
if (!m_client_reset_config) {
check_progress(); // Throws
if (auto pending_reset = PendingResetStore::has_pending_reset(*m_db->start_frozen())) {
m_sess->logger.info(util::LogCategory::reset, "Found pending client reset tracker: %1", *pending_reset);
}
}
}

void SessionWrapper::force_close()
Expand Down Expand Up @@ -1651,49 +1645,6 @@ util::Future<std::string> SessionWrapper::send_test_command(std::string body)
return m_sess->send_test_command(std::move(body));
}

void SessionWrapper::handle_pending_client_reset_acknowledgement()
{
REALM_ASSERT(!m_finalized);

auto has_pending_reset = PendingResetStore::has_pending_reset(m_db->start_frozen());
if (!has_pending_reset) {
return; // nothing to do
}

m_sess->logger.info(util::LogCategory::reset, "Tracking %1", *has_pending_reset);

// Now that the client reset merge is complete, wait for the changes to synchronize with the server
async_wait_for(
true, true, [self = util::bind_ptr(this), pending_reset = std::move(*has_pending_reset)](Status status) {
if (status == ErrorCodes::OperationAborted) {
return;
}
auto& logger = self->m_sess->logger;
if (!status.is_ok()) {
logger.error(util::LogCategory::reset, "Error while tracking client reset acknowledgement: %1",
status);
return;
}

logger.debug(util::LogCategory::reset, "Server has acknowledged %1", pending_reset);

auto tr = self->m_db->start_write();
auto cur_pending_reset = PendingResetStore::has_pending_reset(tr);
if (!cur_pending_reset) {
logger.debug(util::LogCategory::reset, "Client reset cycle detection tracker already removed.");
return;
}
if (*cur_pending_reset == pending_reset) {
logger.debug(util::LogCategory::reset, "Removing client reset cycle detection tracker.");
}
else {
logger.info(util::LogCategory::reset, "Found new %1", cur_pending_reset);
}
PendingResetStore::clear_pending_reset(tr);
tr->commit();
});
}

std::string SessionWrapper::get_appservices_connection_id()
{
auto pf = util::make_promise_future<std::string>();
Expand Down
2 changes: 0 additions & 2 deletions src/realm/sync/network/default_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
initiate_resolve();
}

virtual ~DefaultWebSocketImpl() = default;

void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
{
m_websocket.async_write_binary(data.data(), data.size(),
Expand Down
28 changes: 21 additions & 7 deletions src/realm/sync/noinst/client_history_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <realm/sync/instruction_replication.hpp>
#include <realm/sync/noinst/client_reset.hpp>
#include <realm/sync/noinst/client_reset_recovery.hpp>
#include <realm/sync/noinst/pending_reset_store.hpp>
#include <realm/transaction.hpp>
#include <realm/util/compression.hpp>
#include <realm/util/features.h>
Expand Down Expand Up @@ -335,16 +336,15 @@ void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, boo
}


// Overriding member function in realm::sync::ClientHistoryBase
void ClientHistory::set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
VersionInfo& version_info)
VersionInfo& version_info, util::Logger& logger)
{
TransactionRef wt = m_db->start_write(); // Throws
version_type local_version = wt->get_version();
ensure_updated(local_version); // Throws
prepare_for_write(); // Throws

update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws

// Note: This transaction produces an empty changeset. Empty changesets are
// not uploaded to the server.
Expand Down Expand Up @@ -489,17 +489,17 @@ void ClientHistory::integrate_server_changesets(
// During the bootstrap phase in flexible sync, the server sends multiple download messages with the same
// synthetic server version that represents synthetic changesets generated from state on the server.
if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) {
update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws
}
// Always update progress for download messages from steady state.
else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) {
auto partial_progress = progress;
partial_progress.download.server_version = last_changeset.remote_version;
partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version;
update_sync_progress(partial_progress, downloadable_bytes); // Throws
update_sync_progress(partial_progress, downloadable_bytes, logger); // Throws
}
else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) {
update_sync_progress(progress, downloadable_bytes); // Throws
update_sync_progress(progress, downloadable_bytes, logger); // Throws
}
if (run_in_write_tr) {
run_in_write_tr(*transact, changesets_for_cb);
Expand Down Expand Up @@ -876,7 +876,8 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
}


void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes)
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
util::Logger& logger)
{
Array& root = m_arrays->root;

Expand Down Expand Up @@ -947,6 +948,19 @@ void ClientHistory::update_sync_progress(const SyncProgress& progress, Downloada
root.set(s_progress_uploaded_bytes_iip,
RefOrTagged::make_tagged(uploaded_bytes)); // Throws

if (previous_upload_client_version < progress.upload.client_version) {
// This is part of the client reset cycle detection.
// A client reset operation will write a flag to an internal table indicating that
// the changes there are a result of a successful reset. However, it is not possible to
// know if a recovery has been successful until the changes have been acknowledged by the
// server. The situation we want to avoid is that a recovery itself causes another reset
// which creates a reset cycle. However, at this point, upload progress has been made
// and we can remove the cycle detection flag if there is one.
if (PendingResetStore::clear_pending_reset(*m_group)) {
logger.info(util::LogCategory::reset, "Clearing pending reset tracker after upload completion.");
}
}

m_progress_download = progress.download;

trim_sync_history(); // Throws
Expand Down
5 changes: 3 additions & 2 deletions src/realm/sync/noinst/client_history_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ class ClientHistory final : public _impl::History, public TransformHistory {
/// \param downloadable_bytes If specified, and if the implementation cares
/// about byte-level progress, this function updates the persistent record
/// of the estimate of the number of remaining bytes to be downloaded.
void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&);
void set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes, VersionInfo&,
util::Logger& logger);

/// \brief Scan through the history for changesets to be uploaded.
///
Expand Down Expand Up @@ -421,7 +422,7 @@ class ClientHistory final : public _impl::History, public TransformHistory {
void prepare_for_write();
Replication::version_type add_changeset(BinaryData changeset, BinaryData sync_changeset);
void add_sync_history_entry(const HistoryEntry&);
void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes);
void update_sync_progress(const SyncProgress&, DownloadableProgress downloadable_bytes, util::Logger& logger);
void trim_ct_history();
void trim_sync_history();
void do_trim_sync_history(std::size_t n);
Expand Down
27 changes: 13 additions & 14 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,8 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
if (m_websocket_error_received)
return;

m_sending_session = sess;
m_sending = true;
m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
if (sentinel->destroyed) {
return;
Expand All @@ -991,8 +993,6 @@ void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
}
handle_write_message(); // Throws
}); // Throws
m_sending_session = sess;
m_sending = true;
}


Expand Down Expand Up @@ -1571,7 +1571,7 @@ void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast6
"received empty download message that was not the last in batch",
ProtocolError::bad_progress);
}
history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
history.set_sync_progress(progress, downloadable_bytes, version_info, logger); // Throws
return;
}

Expand Down Expand Up @@ -1718,9 +1718,6 @@ void Session::activate()
catch (...) {
on_integration_failure(IntegrationException(exception_to_status()));
}

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
}


Expand Down Expand Up @@ -2270,16 +2267,18 @@ bool Session::client_reset_if_needed()
m_progress.download.last_integrated_client_version);
REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);

m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
// Reset the cached values which are used to calculate progress since the
// last time sync completed
init_progress_handler();
// In recovery mode, there may be new changesets to upload and nothing left to download.
// In FLX DiscardLocal mode, there may be new commits due to subscription handling.
// For both, we want to allow uploads again without needing external changes to download first.
m_delay_uploads = false;

// Checks if there is a pending client reset
handle_pending_client_reset_acknowledgement();
// Update the download progress to match what it would have been if we'd
// received a MARK message from the server (as the fresh Realm which we used
// as the source data for the reset did).
m_upload_progress = m_progress.upload;
m_download_progress = m_progress.download;
m_server_version_at_last_download_mark = m_progress.download.server_version;
m_last_download_mark_received = m_last_download_mark_sent = m_target_download_mark;
check_for_download_completion();

// If a migration or rollback is in progress, mark it complete when client reset is completed.
if (auto migration_store = get_migration_store()) {
Expand Down
1 change: 0 additions & 1 deletion src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,6 @@ class ClientImpl::Session {
void process_pending_flx_bootstrap();

bool client_reset_if_needed();
void handle_pending_client_reset_acknowledgement();

void gather_pending_compensating_writes(util::Span<Changeset> changesets, std::vector<ProtocolErrorInfo>* out);

Expand Down
16 changes: 12 additions & 4 deletions src/realm/sync/noinst/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,14 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy
PendingReset::Action action, const std::optional<Status>& error,
util::Logger& logger)
{
if (auto previous_reset = sync::PendingResetStore::has_pending_reset(wt_local)) {
if (auto previous_reset = sync::PendingResetStore::has_pending_reset(*wt_local)) {
logger.info(util::LogCategory::reset, "Found a previous %1", *previous_reset);
if (action != previous_reset->action) {
// IF a different client reset is being performed, cler the pending client reset and start over.
logger.info(util::LogCategory::reset,
"New '%1' client reset of type: '%2' is incompatible - clearing previous reset", action,
mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
sync::PendingResetStore::clear_pending_reset(*wt_local);
}
else {
switch (previous_reset->mode) {
Expand All @@ -444,10 +444,10 @@ ClientResyncMode reset_precheck_guard(const TransactionRef& wt_local, ClientResy
util::LogCategory::reset,
"A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal",
previous_reset->mode, previous_reset->time, mode);
sync::PendingResetStore::clear_pending_reset(wt_local);
sync::PendingResetStore::clear_pending_reset(*wt_local);
break;
case ClientResyncMode::DiscardLocal:
sync::PendingResetStore::clear_pending_reset(wt_local);
sync::PendingResetStore::clear_pending_reset(*wt_local);
// previous mode Recover and this mode is Discard, this is not a cycle yet
break;
case ClientResyncMode::Manual:
Expand Down Expand Up @@ -546,6 +546,14 @@ bool perform_client_reset_diff(DB& db_local, sync::ClientReset& reset_config, ut
}
}

// If there was nothing to recover or recovery was disabled then immediately
// mark the client reset as successfully complete
if (recovered.empty()) {
logger.info(util::LogCategory::reset,
"Immediately removing client reset tracker as there are no recovered changesets to upload.");
sync::PendingResetStore::clear_pending_reset(*wt_local);
}

wt_local->commit_and_continue_as_read();

VersionID new_version_local = wt_local->get_version_of_current_transaction();
Expand Down
6 changes: 3 additions & 3 deletions src/realm/sync/noinst/migration_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ bool MigrationStore::load_data(bool read_only)

auto tr = m_db->start_read();
// Start with a reader so it doesn't try to write until we are ready
SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
SyncMetadataSchemaVersionsReader schema_versions_reader(*tr);
if (auto schema_version =
schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_migration_store)) {
schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_flx_migration_store)) {
if (*schema_version != c_schema_version) {
throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion,
"Invalid schema version for flexible sync migration store metadata");
}
load_sync_metadata_schema(tr, &internal_tables);
load_sync_metadata_schema(*tr, &internal_tables);
}
else {
if (read_only) {
Expand Down
6 changes: 3 additions & 3 deletions src/realm/sync/noinst/pending_bootstrap_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger,

auto tr = m_db->start_read();
// Start with a reader so it doesn't try to write until we are ready
SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
SyncMetadataSchemaVersionsReader schema_versions_reader(*tr);
if (auto schema_version =
schema_versions_reader.get_version_for(tr, internal_schema_groups::c_pending_bootstraps)) {
schema_versions_reader.get_version_for(*tr, internal_schema_groups::c_pending_bootstraps)) {
if (*schema_version != c_schema_version) {
throw RuntimeError(ErrorCodes::SchemaVersionMismatch,
"Invalid schema version for FLX sync pending bootstrap table group");
}
load_sync_metadata_schema(tr, &internal_tables);
load_sync_metadata_schema(*tr, &internal_tables);
}
else {
tr->promote_to_write();
Expand Down
Loading

0 comments on commit 4cc2831

Please sign in to comment.