Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent resuming a session that has not been fully shut down #7160

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/realm.h
Original file line number Diff line number Diff line change
Expand Up @@ -3374,6 +3374,10 @@ typedef enum realm_sync_error_action {
RLM_SYNC_ERROR_ACTION_CLIENT_RESET_NO_RECOVERY,
RLM_SYNC_ERROR_ACTION_MIGRATE_TO_FLX,
RLM_SYNC_ERROR_ACTION_REVERT_TO_PBS,
RLM_SYNC_ERROR_ACTION_REFRESH_USER,
RLM_SYNC_ERROR_ACTION_REFRESH_LOCATION,
RLM_SYNC_ERROR_ACTION_LOG_OUT_USER,
RLM_SYNC_ERROR_ACTION_BACKUP_THEN_DELETE_REALM,
} realm_sync_error_action_e;

typedef struct realm_sync_session realm_sync_session_t;
Expand Down Expand Up @@ -3873,7 +3877,7 @@ RLM_API void realm_sync_session_wait_for_upload_completion(realm_sync_session_t*
*/
RLM_API void realm_sync_session_handle_error_for_testing(const realm_sync_session_t* session,
realm_errno_e error_code, const char* error_str,
bool is_fatal);
bool is_fatal, realm_sync_error_action_e action);

/**
* In case of exception thrown in user code callbacks, this api will allow the sdk to store the user code exception
Expand Down
13 changes: 10 additions & 3 deletions src/realm/object-store/c_api/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::MigrateToFLX)
RLM_SYNC_ERROR_ACTION_MIGRATE_TO_FLX);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::RevertToPBS) ==
RLM_SYNC_ERROR_ACTION_REVERT_TO_PBS);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::RefreshUser) ==
RLM_SYNC_ERROR_ACTION_REFRESH_USER);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::RefreshLocation) ==
RLM_SYNC_ERROR_ACTION_REFRESH_LOCATION);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::LogOutUser) == RLM_SYNC_ERROR_ACTION_LOG_OUT_USER);
static_assert(realm_sync_error_action_e(ProtocolErrorInfo::Action::BackupThenDeleteRealm) ==
RLM_SYNC_ERROR_ACTION_BACKUP_THEN_DELETE_REALM);

static_assert(realm_flx_sync_subscription_set_state_e(SubscriptionSet::State::Pending) ==
RLM_SYNC_SUBSCRIPTION_PENDING);
Expand Down Expand Up @@ -834,12 +841,12 @@ RLM_API void realm_sync_session_wait_for_upload_completion(realm_sync_session_t*

RLM_API void realm_sync_session_handle_error_for_testing(const realm_sync_session_t* session,
realm_errno_e error_code, const char* error_str,
bool is_fatal)
bool is_fatal, realm_sync_error_action_e action)
{
REALM_ASSERT(session);
SyncSession::OnlyForTesting::handle_error(
*session->get(),
sync::SessionErrorInfo{Status{static_cast<ErrorCodes::Error>(error_code), error_str}, !is_fatal});
*session->get(), sync::SessionErrorInfo{Status{static_cast<ErrorCodes::Error>(error_code), error_str},
!is_fatal, static_cast<ProtocolErrorInfo::Action>(action)});
}

} // namespace realm::c_api
5 changes: 0 additions & 5 deletions src/realm/object-store/sync/impl/sync_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ struct SyncClient {
}
#endif

void cancel_reconnect_delay()
{
m_client.cancel_reconnect_delay();
}

void stop()
{
m_client.shutdown();
Expand Down
167 changes: 80 additions & 87 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
sync::SessionErrorInfo synthetic(
Status{ErrorCodes::AutoClientResetFailed,
util::format("A fatal error occurred during client reset: '%1'", status.reason())},
sync::IsFatal{true});
sync::IsFatal{true}, sync::ProtocolErrorInfo::Action::BackupThenDeleteRealm);
handle_error(synthetic);
return;
}
Expand Down Expand Up @@ -644,93 +644,85 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
util::Optional<ShouldBackup> delete_file;
bool log_out_user = false;
bool unrecognized_by_client = false;

if (error.status == ErrorCodes::AutoClientResetFailed) {
// At this point, automatic recovery has been attempted but it failed.
// Fallback to a manual reset and let the user try to handle it.
next_state = NextStateAfterError::inactive;
delete_file = ShouldBackup::yes;
}
else if (error.server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
switch (error.server_requests_action) {
case sync::ProtocolErrorInfo::Action::NoAction:
REALM_UNREACHABLE(); // This is not sent by the MongoDB server
case sync::ProtocolErrorInfo::Action::ApplicationBug:
[[fallthrough]];
case sync::ProtocolErrorInfo::Action::ProtocolViolation:
break;
case sync::ProtocolErrorInfo::Action::Warning:
break; // not fatal, but should be bubbled up to the user below.
case sync::ProtocolErrorInfo::Action::Transient:
// Not real errors, don't need to be reported to the binding.
return;
case sync::ProtocolErrorInfo::Action::DeleteRealm:
next_state = NextStateAfterError::inactive;
delete_file = ShouldBackup::no;
break;
case sync::ProtocolErrorInfo::Action::ClientReset:
[[fallthrough]];
case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
switch (config(&SyncConfig::client_resync_mode)) {
case ClientResyncMode::Manual:
next_state = NextStateAfterError::inactive;
delete_file = ShouldBackup::yes;
break;
case ClientResyncMode::DiscardLocal:
[[fallthrough]];
case ClientResyncMode::RecoverOrDiscard:
[[fallthrough]];
case ClientResyncMode::Recover:
download_fresh_realm(error.server_requests_action);
return; // do not propgate the error to the user at this point
}
break;
case sync::ProtocolErrorInfo::Action::MigrateToFLX:
// Should not receive this error if original sync config is FLX
REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
// Original config was PBS, migrating to FLX
m_migration_store->migrate_to_flx(*error.migration_query_string,
m_original_sync_config->partition_value);
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
bool unrecognized_by_client = error.status == ErrorCodes::UnknownError;

switch (error.server_requests_action) {
case sync::ProtocolErrorInfo::Action::NoAction:
REALM_UNREACHABLE();
case sync::ProtocolErrorInfo::Action::ApplicationBug:
[[fallthrough]];
case sync::ProtocolErrorInfo::Action::ProtocolViolation:
next_state = NextStateAfterError::inactive;
break;
case sync::ProtocolErrorInfo::Action::Warning:
break; // not fatal, but should be bubbled up to the user below.
case sync::ProtocolErrorInfo::Action::Transient:
// Not real errors, don't need to be reported to the binding.
return;
case sync::ProtocolErrorInfo::Action::BackupThenDeleteRealm:
next_state = NextStateAfterError::inactive;
delete_file = ShouldBackup::yes;
break;
case sync::ProtocolErrorInfo::Action::DeleteRealm:
next_state = NextStateAfterError::inactive;
delete_file = ShouldBackup::no;
break;
case sync::ProtocolErrorInfo::Action::ClientReset:
[[fallthrough]];
case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
switch (config(&SyncConfig::client_resync_mode)) {
case ClientResyncMode::Manual:
next_state = NextStateAfterError::inactive;
delete_file = ShouldBackup::yes;
break;
case ClientResyncMode::DiscardLocal:
[[fallthrough]];
case ClientResyncMode::RecoverOrDiscard:
[[fallthrough]];
case ClientResyncMode::Recover:
download_fresh_realm(error.server_requests_action);
return; // do not propgate the error to the user at this point
}
break;
case sync::ProtocolErrorInfo::Action::MigrateToFLX:
// Should not receive this error if original sync config is FLX
REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
// Original config was PBS, migrating to FLX
m_migration_store->migrate_to_flx(*error.migration_query_string, m_original_sync_config->partition_value);
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
return;
case sync::ProtocolErrorInfo::Action::RevertToPBS:
// If the client was updated to use FLX natively, but the server was rolled back to PBS,
// the server should be sending switch_to_flx_sync; throw exception if this error is not
// received.
if (m_original_sync_config->flx_sync_requested) {
throw LogicError(ErrorCodes::InvalidServerResponse,
"Received 'RevertToPBS' from server after rollback while client is natively "
"using FLX - expected 'SwitchToPBS'");
}
// Original config was PBS, rollback the migration
m_migration_store->rollback_to_pbs();
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
return;
case sync::ProtocolErrorInfo::Action::RefreshUser:
if (auto u = user()) {
u->refresh_custom_data(false, handle_refresh(shared_from_this(), false));
return;
case sync::ProtocolErrorInfo::Action::RevertToPBS:
// If the client was updated to use FLX natively, but the server was rolled back to PBS,
// the server should be sending switch_to_flx_sync; throw exception if this error is not
// received.
if (m_original_sync_config->flx_sync_requested) {
throw LogicError(ErrorCodes::InvalidServerResponse,
"Received 'RevertToPBS' from server after rollback while client is natively "
"using FLX - expected 'SwitchToPBS'");
}
// Original config was PBS, rollback the migration
m_migration_store->rollback_to_pbs();
save_sync_config_after_migration_or_rollback();
download_fresh_realm(error.server_requests_action);
}
break;
case sync::ProtocolErrorInfo::Action::RefreshLocation:
if (auto u = user()) {
u->refresh_custom_data(true, handle_refresh(shared_from_this(), true));
return;
case sync::ProtocolErrorInfo::Action::RefreshUser:
if (auto u = user()) {
u->refresh_custom_data(false, handle_refresh(shared_from_this(), false));
return;
}
break;
case sync::ProtocolErrorInfo::Action::RefreshLocation:
if (auto u = user()) {
u->refresh_custom_data(true, handle_refresh(shared_from_this(), true));
return;
}
break;
case sync::ProtocolErrorInfo::Action::LogOutUser:
next_state = NextStateAfterError::inactive;
log_out_user = true;
break;
}
}
else {
// Unrecognized error code.
unrecognized_by_client = true;
}
break;
case sync::ProtocolErrorInfo::Action::LogOutUser:
next_state = NextStateAfterError::inactive;
log_out_user = true;
break;
}

util::CheckedUniqueLock lock(m_state_mutex);
Expand All @@ -743,7 +735,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
update_error_and_mark_file_for_deletion(sync_error, *delete_file);

if (m_state == State::Dying && error.is_fatal) {
become_inactive(std::move(lock), error.status);
become_inactive(std::move(lock), sync_error.status);
return;
}

Expand All @@ -760,6 +752,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
}
break;
case NextStateAfterError::inactive: {
m_session->mark_unresumable();
become_inactive(std::move(lock), sync_error.status);
break;
}
Expand Down
22 changes: 20 additions & 2 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener

std::string get_appservices_connection_id();

void mark_unresumable();

private:
ClientImpl& m_client;
DBRef m_db;
Expand Down Expand Up @@ -197,6 +199,7 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
bool m_force_closed = false;

bool m_suspended = false;
bool m_resumable = true;

// Has the SessionWrapper been finalized?
bool m_finalized = false;
Expand Down Expand Up @@ -993,8 +996,8 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data
auto action = m_wrapper.m_debug_hook(data);
switch (action) {
case realm::SyncClientHookAction::SuspendWithRetryableError: {
SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false});
err_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false},
ProtocolErrorInfo::Action::Transient);

auto err_processing_err = receive_error_message(err_info);
REALM_ASSERT_EX(err_processing_err.is_ok(), err_processing_err);
Expand Down Expand Up @@ -1322,6 +1325,11 @@ void SessionWrapper::cancel_reconnect_delay()
if (REALM_UNLIKELY(!self->m_sess))
return; // Already finalized
SessionImpl& sess = *self->m_sess;
if (!self->m_resumable) {
sess.logger.debug("Cannot resume a session that has received a fatal error");
return;
}

sess.cancel_resumption_delay(); // Throws
ClientImpl::Connection& conn = sess.get_connection();
conn.cancel_reconnect_delay(); // Throws
Expand Down Expand Up @@ -1860,6 +1868,11 @@ std::string SessionWrapper::get_appservices_connection_id()
return pf.future.get();
}

void SessionWrapper::mark_unresumable()
{
m_resumable = false;
}

// ################ ClientImpl::Connection ################

ClientImpl::Connection::Connection(ClientImpl& client, connection_ident_type ident, ServerEndpoint endpoint,
Expand Down Expand Up @@ -2106,6 +2119,11 @@ std::string Session::get_appservices_connection_id()
return m_impl->get_appservices_connection_id();
}

void Session::mark_unresumable()
{
m_impl->mark_unresumable();
}

std::ostream& operator<<(std::ostream& os, ProxyConfig::Type proxyType)
{
switch (proxyType) {
Expand Down
7 changes: 7 additions & 0 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,13 @@ class Session {
/// with the error.
std::string get_appservices_connection_id();

/// Marks the session as un-resumable after a fatal error.
///
/// This function is not thread-safe and should be called from the connection-state
/// listener callback if the Session should not initiate a re-connect/resume after
/// a fatal error.
void mark_unresumable();

private:
SessionWrapper* m_impl = nullptr;

Expand Down
4 changes: 2 additions & 2 deletions src/realm/sync/client_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ struct SessionErrorInfo : public ProtocolErrorInfo {
{
}

SessionErrorInfo(Status status, IsFatal is_fatal)
: ProtocolErrorInfo(0, {}, is_fatal)
SessionErrorInfo(Status status, IsFatal is_fatal, Action error_action)
: ProtocolErrorInfo(0, {}, is_fatal, error_action)
, status(std::move(status))
{
}
Expand Down
1 change: 1 addition & 0 deletions src/realm/sync/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ enum class SyncClientHookEvent {
BootstrapMessageProcessed,
BootstrapProcessed,
ErrorMessageReceived,
SessionSuspended,
};

enum class SyncClientHookAction {
Expand Down
Loading
Loading