Skip to content

Commit 3571738

Browse files
authored
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)
Updating the subscription store in a separate write transaction from the recovery means that we temporarily commit an invalid state. If the application crashes between committing the client reset diff and updating the subscription store, the next launch of the application would try to use the now-invalid pending subscriptions that should have been discarded.
1 parent 97523e7 commit 3571738

File tree

11 files changed

+183
-136
lines changed

11 files changed

+183
-136
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
### Fixed
88
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
9-
* None.
9+
* A crash at a very specific time during a DiscardLocal client reset on a FLX Realm could leave subscriptions in an invalid state ([#7110](https://github.com/realm/realm-core/pull/7110), since v12.3.0).
1010

1111
### Breaking changes
1212
* None.

src/realm/sync/client.cpp

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1785,10 +1785,7 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()
17851785
{
17861786
REALM_ASSERT(!m_finalized);
17871787

1788-
auto pending_reset = [&] {
1789-
auto ft = m_db->start_frozen();
1790-
return _impl::client_reset::has_pending_reset(ft);
1791-
}();
1788+
auto pending_reset = _impl::client_reset::has_pending_reset(*m_db->start_frozen());
17921789
REALM_ASSERT(pending_reset);
17931790
m_sess->logger.info("Tracking pending client reset of type \"%1\" from %2", pending_reset->type,
17941791
pending_reset->time);
@@ -1804,7 +1801,7 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()
18041801
}
18051802

18061803
auto wt = self->m_db->start_write();
1807-
auto cur_pending_reset = _impl::client_reset::has_pending_reset(wt);
1804+
auto cur_pending_reset = _impl::client_reset::has_pending_reset(*wt);
18081805
if (!cur_pending_reset) {
18091806
logger.debug(
18101807
"Was going to remove client reset tracker for type \"%1\" from %2, but it was already removed",
@@ -1821,7 +1818,7 @@ void SessionWrapper::handle_pending_client_reset_acknowledgement()
18211818
"Removing cycle detection tracker.",
18221819
pending_reset.type, pending_reset.time);
18231820
}
1824-
_impl::client_reset::remove_pending_client_resets(wt);
1821+
_impl::client_reset::remove_pending_client_resets(*wt);
18251822
wt->commit();
18261823
});
18271824
}

src/realm/sync/noinst/client_history_impl.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,23 @@ void ClientHistory::get_status(version_type& current_client_version, SaltedFileI
265265
current_client_version = 0;
266266

267267
if (has_pending_client_reset) {
268-
*has_pending_client_reset = _impl::client_reset::has_pending_reset(rt).has_value();
268+
*has_pending_client_reset = _impl::client_reset::has_pending_reset(*rt).has_value();
269269
}
270270
}
271271

272+
SaltedFileIdent ClientHistory::get_client_file_ident(const Transaction& rt) const
273+
{
274+
SaltedFileIdent client_file_ident{rt.get_sync_file_id(), 0};
275+
276+
using gf = _impl::GroupFriend;
277+
if (ref_type ref = gf::get_history_ref(rt)) {
278+
Array root(m_db->get_alloc());
279+
root.init_from_ref(ref);
280+
client_file_ident.salt = salt_type(root.get_as_ref_or_tagged(s_client_file_ident_salt_iip).get_as_int());
281+
}
282+
283+
return client_file_ident;
284+
}
272285

273286
void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, bool fix_up_object_ids)
274287
{

src/realm/sync/noinst/client_history_impl.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ class ClientHistory final : public _impl::History, public TransformHistory {
152152
/// when engaging in future synchronization sessions.
153153
void set_client_file_ident(SaltedFileIdent client_file_ident, bool fix_up_object_ids);
154154

155+
/// Gets the client file ident set with `set_client_file_ident`, or `{0, 0}` if it has never been set.
156+
SaltedFileIdent get_client_file_ident(const Transaction& tr) const;
157+
155158
/// Stores the synchronization progress in the associated Realm file in a
156159
/// way that makes it available via get_status() during future
157160
/// synchronization sessions. Progress is reported by the server in the

src/realm/sync/noinst/client_reset.cpp

Lines changed: 92 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -415,20 +415,16 @@ constexpr static std::string_view s_timestamp_col_name("event_time");
415415
constexpr static std::string_view s_reset_type_col_name("type_of_reset");
416416
constexpr int64_t metadata_version = 1;
417417

418-
void remove_pending_client_resets(TransactionRef wt)
418+
void remove_pending_client_resets(Transaction& wt)
419419
{
420-
REALM_ASSERT(wt);
421-
if (auto table = wt->get_table(s_meta_reset_table_name)) {
422-
if (table->size()) {
423-
table->clear();
424-
}
420+
if (auto table = wt.get_table(s_meta_reset_table_name); table && !table->is_empty()) {
421+
table->clear();
425422
}
426423
}
427424

428-
util::Optional<PendingReset> has_pending_reset(TransactionRef rt)
425+
util::Optional<PendingReset> has_pending_reset(const Transaction& rt)
429426
{
430-
REALM_ASSERT(rt);
431-
ConstTableRef table = rt->get_table(s_meta_reset_table_name);
427+
ConstTableRef table = rt.get_table(s_meta_reset_table_name);
432428
if (!table || table->size() == 0) {
433429
return util::none;
434430
}
@@ -466,14 +462,13 @@ util::Optional<PendingReset> has_pending_reset(TransactionRef rt)
466462
return pending;
467463
}
468464

469-
void track_reset(TransactionRef wt, ClientResyncMode mode)
465+
void track_reset(Transaction& wt, ClientResyncMode mode)
470466
{
471-
REALM_ASSERT(wt);
472467
REALM_ASSERT(mode != ClientResyncMode::Manual);
473-
TableRef table = wt->get_table(s_meta_reset_table_name);
468+
TableRef table = wt.get_table(s_meta_reset_table_name);
474469
ColKey version_col, timestamp_col, type_col;
475470
if (!table) {
476-
table = wt->add_table_with_primary_key(s_meta_reset_table_name, type_ObjectId, s_pk_col_name);
471+
table = wt.add_table_with_primary_key(s_meta_reset_table_name, type_ObjectId, s_pk_col_name);
477472
REALM_ASSERT(table);
478473
version_col = table->add_column(type_Int, s_version_column_name);
479474
timestamp_col = table->add_column(type_Timestamp, s_timestamp_col_name);
@@ -503,10 +498,9 @@ void track_reset(TransactionRef wt, ClientResyncMode mode)
503498
{type_col, mode_val}});
504499
}
505500

506-
static ClientResyncMode reset_precheck_guard(TransactionRef wt, ClientResyncMode mode, bool recovery_is_allowed,
501+
static ClientResyncMode reset_precheck_guard(Transaction& wt, ClientResyncMode mode, bool recovery_is_allowed,
507502
util::Logger& logger)
508503
{
509-
REALM_ASSERT(wt);
510504
if (auto previous_reset = has_pending_reset(wt)) {
511505
logger.info("A previous reset was detected of type: '%1' at: %2", previous_reset->type, previous_reset->time);
512506
switch (previous_reset->type) {
@@ -557,19 +551,71 @@ static ClientResyncMode reset_precheck_guard(TransactionRef wt, ClientResyncMode
557551
return mode;
558552
}
559553

560-
LocalVersionIDs perform_client_reset_diff(DBRef db_local, DBRef db_remote, sync::SaltedFileIdent client_file_ident,
554+
LocalVersionIDs perform_client_reset_diff(DB& db_local, DB& db_remote, sync::SaltedFileIdent client_file_ident,
561555
util::Logger& logger, ClientResyncMode mode, bool recovery_is_allowed,
562556
bool* did_recover_out, sync::SubscriptionStore* sub_store,
563557
util::UniqueFunction<void(int64_t)> on_flx_version_complete)
564558
{
565-
REALM_ASSERT(db_local);
566-
REALM_ASSERT(db_remote);
567-
logger.info("Client reset, path_local = %1, "
568-
"client_file_ident.ident = %2, "
569-
"client_file_ident.salt = %3,"
570-
"remote = %4, mode = %5, recovery_is_allowed = %6",
571-
db_local->get_path(), client_file_ident.ident, client_file_ident.salt, db_remote->get_path(), mode,
572-
recovery_is_allowed);
559+
auto wt_local = db_local.start_write();
560+
auto actual_mode = reset_precheck_guard(*wt_local, mode, recovery_is_allowed, logger);
561+
bool recover_local_changes =
562+
actual_mode == ClientResyncMode::Recover || actual_mode == ClientResyncMode::RecoverOrDiscard;
563+
564+
logger.info("Client reset: path_local = %1, "
565+
"client_file_ident = (ident: %2, salt: %3), "
566+
"remote_path = %4, requested_mode = %5, recovery_is_allowed = %6, "
567+
"actual_mode = %7, will_recover = %8",
568+
db_local.get_path(), client_file_ident.ident, client_file_ident.salt, db_remote.get_path(), mode,
569+
recovery_is_allowed, actual_mode, recover_local_changes);
570+
571+
auto& repl_local = dynamic_cast<ClientReplication&>(*db_local.get_replication());
572+
auto& history_local = repl_local.get_history();
573+
history_local.ensure_updated(wt_local->get_version());
574+
SaltedFileIdent orig_file_ident = history_local.get_client_file_ident(*wt_local);
575+
VersionID old_version_local = wt_local->get_version_of_current_transaction();
576+
577+
auto& repl_remote = dynamic_cast<ClientReplication&>(*db_remote.get_replication());
578+
auto& history_remote = repl_remote.get_history();
579+
580+
sync::SaltedVersion fresh_server_version = {0, 0};
581+
{
582+
SyncProgress remote_progress;
583+
sync::version_type remote_version_unused;
584+
SaltedFileIdent remote_ident_unused;
585+
history_remote.get_status(remote_version_unused, remote_ident_unused, remote_progress);
586+
fresh_server_version = remote_progress.latest_server_version;
587+
}
588+
589+
if (!recover_local_changes) {
590+
auto rt_remote = db_remote.start_read();
591+
// transform the local Realm such that all public tables become identical to the remote Realm
592+
transfer_group(*rt_remote, *wt_local, logger, false);
593+
594+
// now that the state of the fresh and local Realms are identical,
595+
// reset the local sync history and steal the fresh Realm's ident
596+
history_local.set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version,
597+
BinaryData());
598+
599+
int64_t subscription_version = 0;
600+
if (sub_store) {
601+
subscription_version = sub_store->set_active_as_latest(*wt_local);
602+
}
603+
604+
wt_local->commit_and_continue_as_read();
605+
if (did_recover_out) {
606+
*did_recover_out = false;
607+
}
608+
if (on_flx_version_complete) {
609+
on_flx_version_complete(subscription_version);
610+
}
611+
612+
VersionID new_version_local = wt_local->get_version_of_current_transaction();
613+
logger.info("perform_client_reset_diff is done: old_version = (version: %1, index: %2), "
614+
"new_version = (version: %3, index: %4)",
615+
old_version_local.version, old_version_local.index, new_version_local.version,
616+
new_version_local.index);
617+
return LocalVersionIDs{old_version_local, new_version_local};
618+
}
573619

574620
auto remake_active_subscription = [&]() {
575621
if (!sub_store) {
@@ -587,44 +633,16 @@ LocalVersionIDs perform_client_reset_diff(DBRef db_local, DBRef db_remote, sync:
587633
sub.version());
588634
};
589635

590-
auto frozen_pre_local_state = db_local->start_frozen();
591-
auto wt_local = db_local->start_write();
592-
auto history_local = dynamic_cast<ClientHistory*>(wt_local->get_replication()->_get_history_write());
593-
REALM_ASSERT(history_local);
594-
VersionID old_version_local = wt_local->get_version_of_current_transaction();
595-
wt_local->get_history()->ensure_updated(old_version_local.version);
596-
SaltedFileIdent orig_file_ident;
597-
{
598-
sync::version_type old_version_unused;
599-
SyncProgress old_progress_unused;
600-
history_local->get_status(old_version_unused, orig_file_ident, old_progress_unused);
601-
}
602-
std::vector<ClientHistory::LocalChange> local_changes;
603-
604-
mode = reset_precheck_guard(wt_local, mode, recovery_is_allowed, logger);
605-
bool recover_local_changes = (mode == ClientResyncMode::Recover || mode == ClientResyncMode::RecoverOrDiscard);
636+
auto frozen_pre_local_state = db_local.start_frozen();
637+
auto local_changes = history_local.get_local_changes(wt_local->get_version());
638+
logger.info("Local changesets to recover: %1", local_changes.size());
606639

607-
if (recover_local_changes) {
608-
local_changes = history_local->get_local_changes(wt_local->get_version());
609-
logger.info("Local changesets to recover: %1", local_changes.size());
610-
}
640+
auto wt_remote = db_remote.start_write();
611641

612-
sync::SaltedVersion fresh_server_version = {0, 0};
613-
auto wt_remote = db_remote->start_write();
614-
auto history_remote = dynamic_cast<ClientHistory*>(wt_remote->get_replication()->_get_history_write());
615-
REALM_ASSERT(history_remote);
616-
617-
SyncProgress remote_progress;
618-
{
619-
sync::version_type remote_version_unused;
620-
SaltedFileIdent remote_ident_unused;
621-
history_remote->get_status(remote_version_unused, remote_ident_unused, remote_progress);
622-
}
623-
fresh_server_version = remote_progress.latest_server_version;
624642
BinaryData recovered_changeset;
625643

626644
// FLX with recovery has to be done in multiple commits, which is significantly different than other modes
627-
if (recover_local_changes && sub_store) {
645+
if (sub_store) {
628646
// In FLX recovery, save a copy of the pending subscriptions for later. This
629647
// needs to be done before they are wiped out by remake_active_subscription()
630648
std::vector<SubscriptionSet> pending_subscriptions = sub_store->get_pending_subscriptions();
@@ -633,8 +651,8 @@ LocalVersionIDs perform_client_reset_diff(DBRef db_local, DBRef db_remote, sync:
633651
// now that the state of the fresh and local Realms are identical,
634652
// reset the local sync history.
635653
// Note that we do not set the new file ident yet! This is done in the last commit.
636-
history_local->set_client_reset_adjustments(wt_local->get_version(), orig_file_ident, fresh_server_version,
637-
recovered_changeset);
654+
history_local.set_client_reset_adjustments(wt_local->get_version(), orig_file_ident, fresh_server_version,
655+
recovered_changeset);
638656
// The local Realm is committed. There are no changes to the remote Realm.
639657
wt_remote->rollback_and_continue_as_read();
640658
wt_local->commit_and_continue_as_read();
@@ -645,9 +663,8 @@ LocalVersionIDs perform_client_reset_diff(DBRef db_local, DBRef db_remote, sync:
645663
// as needed. This has the consequence that there may be extra notifications along
646664
// the way to the final state, but since separate commits are necessary, this is
647665
// unavoidable.
648-
wt_local = db_local->start_write();
649-
RecoverLocalChangesetsHandler handler{*wt_local, *frozen_pre_local_state, logger,
650-
db_local->get_replication()};
666+
wt_local = db_local.start_write();
667+
RecoverLocalChangesetsHandler handler{*wt_local, *frozen_pre_local_state, logger, db_local.get_replication()};
651668
handler.process_changesets(local_changes, std::move(pending_subscriptions)); // throws on error
652669
// The new file ident is set as part of the final commit. This is to ensure that if
653670
// there are any exceptions during recovery, or the process is killed for some
@@ -656,45 +673,35 @@ LocalVersionIDs perform_client_reset_diff(DBRef db_local, DBRef db_remote, sync:
656673
// partially recovered, but interrupted may continue sync the next time it is
657674
// opened with only partially recovered state while having lost the history of any
658675
// offline modifications.
659-
history_local->set_client_file_ident_in_wt(wt_local->get_version(), client_file_ident);
676+
history_local.set_client_file_ident_in_wt(wt_local->get_version(), client_file_ident);
660677
wt_local->commit_and_continue_as_read();
661678
}
662679
else {
663-
if (recover_local_changes) {
664-
// In PBS recovery, the strategy is to apply all local changes to the remote realm first,
665-
// and then transfer the modified state all at once to the local Realm. This creates a
666-
// nice side effect for notifications because only the minimal state change is made.
667-
RecoverLocalChangesetsHandler handler{*wt_remote, *frozen_pre_local_state, logger,
668-
db_remote->get_replication()};
669-
handler.process_changesets(local_changes, {}); // throws on error
670-
ClientReplication* client_repl = dynamic_cast<ClientReplication*>(wt_remote->get_replication());
671-
REALM_ASSERT_RELEASE(client_repl);
672-
ChangesetEncoder& encoder = client_repl->get_instruction_encoder();
673-
const sync::ChangesetEncoder::Buffer& buffer = encoder.buffer();
674-
recovered_changeset = {buffer.data(), buffer.size()};
675-
}
680+
// In PBS recovery, the strategy is to apply all local changes to the remote realm first,
681+
// and then transfer the modified state all at once to the local Realm. This creates a
682+
// nice side effect for notifications because only the minimal state change is made.
683+
RecoverLocalChangesetsHandler handler{*wt_remote, *frozen_pre_local_state, logger,
684+
db_remote.get_replication()};
685+
handler.process_changesets(local_changes, {}); // throws on error
686+
ChangesetEncoder& encoder = repl_remote.get_instruction_encoder();
687+
const sync::ChangesetEncoder::Buffer& buffer = encoder.buffer();
688+
recovered_changeset = {buffer.data(), buffer.size()};
676689

677690
// transform the local Realm such that all public tables become identical to the remote Realm
678691
transfer_group(*wt_remote, *wt_local, logger, recover_local_changes);
679692

680693
// now that the state of the fresh and local Realms are identical,
681694
// reset the local sync history and steal the fresh Realm's ident
682-
history_local->set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version,
683-
recovered_changeset);
695+
history_local.set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version,
696+
recovered_changeset);
684697

685698
// Finally, the local Realm is committed. The changes to the remote Realm are discarded.
686699
wt_remote->rollback_and_continue_as_read();
687700
wt_local->commit_and_continue_as_read();
688-
689-
// If in FLX mode, make a copy of the active subscription set and mark it as
690-
// complete. This will cause all other subscription sets to become superceded.
691-
// In DiscardLocal mode, only the active subscription set is preserved, so we
692-
// are done.
693-
remake_active_subscription();
694701
}
695702

696703
if (did_recover_out) {
697-
*did_recover_out = recover_local_changes;
704+
*did_recover_out = true;
698705
}
699706
VersionID new_version_local = wt_local->get_version_of_current_transaction();
700707
logger.info("perform_client_reset_diff is done, old_version.version = %1, "

src/realm/sync/noinst/client_reset.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ struct PendingReset {
6767
ClientResyncMode type;
6868
Timestamp time;
6969
};
70-
void remove_pending_client_resets(TransactionRef wt);
71-
util::Optional<PendingReset> has_pending_reset(TransactionRef wt);
72-
void track_reset(TransactionRef wt, ClientResyncMode mode);
70+
void remove_pending_client_resets(Transaction& wt);
71+
util::Optional<PendingReset> has_pending_reset(const Transaction& wt);
72+
void track_reset(Transaction& wt, ClientResyncMode mode);
7373

7474
// preform_client_reset_diff() takes the Realm performs a client reset on
7575
// the Realm in 'path_local' given the Realm 'path_fresh' as the source of truth.
@@ -86,7 +86,7 @@ struct LocalVersionIDs {
8686
realm::VersionID new_version;
8787
};
8888

89-
LocalVersionIDs perform_client_reset_diff(DBRef db, DBRef db_remote, sync::SaltedFileIdent client_file_ident,
89+
LocalVersionIDs perform_client_reset_diff(DB& db, DB& db_remote, sync::SaltedFileIdent client_file_ident,
9090
util::Logger& logger, ClientResyncMode mode, bool recovery_is_allowed,
9191
bool* did_recover_out, sync::SubscriptionStore* sub_store,
9292
util::UniqueFunction<void(int64_t)> on_flx_version_complete);

src/realm/sync/noinst/client_reset_operation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ bool ClientResetOperation::finalize(sync::SaltedFileIdent salted_file_ident, syn
9999
}
100100
bool did_recover_out = false;
101101
local_version_ids = client_reset::perform_client_reset_diff(
102-
m_db, m_db_fresh, m_salted_file_ident, m_logger, m_mode, m_recovery_is_allowed, &did_recover_out, sub_store,
102+
*m_db, *m_db_fresh, m_salted_file_ident, m_logger, m_mode, m_recovery_is_allowed, &did_recover_out, sub_store,
103103
std::move(on_flx_version_complete)); // throws
104104

105105
if (m_notify_after) {

0 commit comments

Comments
 (0)