@@ -745,15 +745,11 @@ namespace {
745745// field. Appends as many operations as possible until either the constructed object exceeds the
746746// 16MB limit or the maximum number of transaction statements allowed in one entry.
747747//
748- // If 'limitSize' is false, then it attempts to include all given operations, regardless of whether
749- // or not they fit. If the ops don't fit, TransactionTooLarge will be thrown in that case.
750- //
751748// Returns an iterator to the first statement that wasn't packed into the applyOps object.
752749std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps (
753750 BSONObjBuilder* applyOpsBuilder,
754751 std::vector<repl::ReplOperation>::const_iterator stmtBegin,
755- std::vector<repl::ReplOperation>::const_iterator stmtEnd,
756- bool limitSize) {
752+ std::vector<repl::ReplOperation>::const_iterator stmtEnd) {
757753
758754 std::vector<repl::ReplOperation>::const_iterator stmtIter;
759755 BSONArrayBuilder opsArray (applyOpsBuilder->subarrayStart (" applyOps" _sd));
@@ -765,11 +761,9 @@ std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApp
765761 // BSON overhead and the other applyOps fields. But if the array with a single operation
766762 // exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation
767763 // should be able to be applied.
768- if (limitSize &&
769- (opsArray.arrSize () == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
770- (opsArray.arrSize () > 0 &&
771- (opsArray.len () + OplogEntry::getDurableReplOperationSize (stmt) >
772- BSONObjMaxUserSize))))
764+ if (opsArray.arrSize () == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
765+ (opsArray.arrSize () > 0 &&
766+ (opsArray.len () + OplogEntry::getDurableReplOperationSize (stmt) > BSONObjMaxUserSize)))
773767 break ;
774768 opsArray.append (stmt.toBSON ());
775769 }
@@ -880,8 +874,8 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
880874 while (stmtsIter != stmts.end ()) {
881875
882876 BSONObjBuilder applyOpsBuilder;
883- auto nextStmt = packTransactionStatementsForApplyOps (
884- &applyOpsBuilder, stmtsIter, stmts.end (), true /* limitSize */ );
877+ auto nextStmt =
878+ packTransactionStatementsForApplyOps ( &applyOpsBuilder, stmtsIter, stmts.end ());
885879
886880 // If we packed the last op, then the next oplog entry we log should be the implicit
887881 // commit or implicit prepare, i.e. we omit the 'partialTxn' field.
@@ -999,41 +993,14 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
999993 return ;
1000994
1001995 repl::OpTime commitOpTime;
1002- // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in
1003- // a single call into the OpObserver. Therefore, we store the result here and pass it as an
1004- // argument.
1005- const auto fcv = serverGlobalParams.featureCompatibility .getVersion ();
1006- if (fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 ) {
1007- auto txnParticipant = TransactionParticipant::get (opCtx);
1008- const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime ();
1009- invariant (lastWriteOpTime.isNull ());
1010- MutableOplogEntry oplogEntry;
1011- oplogEntry.setPrevWriteOpTimeInTransaction (lastWriteOpTime);
1012- oplogEntry.setStatementId (StmtId (0 ));
1013-
1014- BSONObjBuilder applyOpsBuilder;
1015- // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no longer
1016- // needed.
1017- packTransactionStatementsForApplyOps (
1018- &applyOpsBuilder, statements.begin (), statements.end (), false /* limitSize */ );
1019- oplogEntry.setObject (applyOpsBuilder.done ());
1020-
1021- auto txnState = boost::make_optional (
1022- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42 ,
1023- DurableTxnStateEnum::kCommitted );
1024- commitOpTime = logApplyOpsForTransaction (
1025- opCtx, &oplogEntry, txnState, boost::none, true /* updateTxnTable */ )
1026- .writeOpTime ;
1027- } else {
1028- // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
1029- // reserve enough entries for all statements in the transaction.
1030- auto oplogSlots = repl::getNextOpTimes (opCtx, statements.size ());
1031- invariant (oplogSlots.size () == statements.size ());
1032-
1033- // Log in-progress entries for the transaction along with the implicit commit.
1034- int numOplogEntries = logOplogEntriesForTransaction (opCtx, statements, oplogSlots, false );
1035- commitOpTime = oplogSlots[numOplogEntries - 1 ];
1036- }
996+ // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
997+ // reserve enough entries for all statements in the transaction.
998+ auto oplogSlots = repl::getNextOpTimes (opCtx, statements.size ());
999+ invariant (oplogSlots.size () == statements.size ());
1000+
1001+ // Log in-progress entries for the transaction along with the implicit commit.
1002+ int numOplogEntries = logOplogEntriesForTransaction (opCtx, statements, oplogSlots, false );
1003+ commitOpTime = oplogSlots[numOplogEntries - 1 ];
10371004 invariant (!commitOpTime.isNull ());
10381005 shardObserveTransactionPrepareOrUnpreparedCommit (opCtx, statements, commitOpTime);
10391006}
@@ -1074,48 +1041,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
10741041 return ;
10751042 }
10761043
1077- // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in
1078- // a single call into the OpObserver. Therefore, we store the result here and pass it as an
1079- // argument.
1080- const auto fcv = serverGlobalParams.featureCompatibility .getVersion ();
1081- if (fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42 ) {
1082- // We write the oplog entry in a side transaction so that we do not commit the now-prepared
1083- // transaction.
1084- // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp
1085- // and allow this transaction to be continued on failover.
1086- TransactionParticipant::SideTransactionBlock sideTxn (opCtx);
1087-
1088- writeConflictRetry (
1089- opCtx, " onTransactionPrepare" , NamespaceString::kRsOplogNamespace .ns (), [&] {
1090- // Writes to the oplog only require a Global intent lock. Guaranteed by
1091- // OplogSlotReserver.
1092- invariant (opCtx->lockState ()->isWriteLocked ());
1093-
1094- WriteUnitOfWork wuow (opCtx);
1095- auto txnParticipant = TransactionParticipant::get (opCtx);
1096- const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime ();
1097- invariant (lastWriteOpTime.isNull ());
1098-
1099- MutableOplogEntry oplogEntry;
1100- oplogEntry.setOpTime (prepareOpTime);
1101- oplogEntry.setPrevWriteOpTimeInTransaction (lastWriteOpTime);
1102-
1103- BSONObjBuilder applyOpsBuilder;
1104- // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no
1105- // longer needed.
1106- packTransactionStatementsForApplyOps (
1107- &applyOpsBuilder, statements.begin (), statements.end (), false /* limitSize */ );
1108- applyOpsBuilder.append (" prepare" , true );
1109- oplogEntry.setObject (applyOpsBuilder.done ());
1110-
1111- auto txnState = boost::make_optional (
1112- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42 ,
1113- DurableTxnStateEnum::kPrepared );
1114- logApplyOpsForTransaction (
1115- opCtx, &oplogEntry, txnState, prepareOpTime, true /* updateTxnTable */ );
1116- wuow.commit ();
1117- });
1118- } else {
1044+ {
11191045 // We should have reserved enough slots.
11201046 invariant (reservedSlots.size () >= statements.size ());
11211047 TransactionParticipant::SideTransactionBlock sideTxn (opCtx);
0 commit comments