@@ -1158,31 +1158,34 @@ async fn process_open_request(
11581158 "Starting direct SUBSCRIBE operation (legacy mode)" ,
11591159 ) ;
11601160
1161- // Legacy mode: direct operation without deduplication
1162- let op_id =
1163- crate :: node:: subscribe ( op_manager. clone ( ) , key, Some ( client_id) )
1164- . await
1165- . inspect_err ( |err| {
1166- tracing:: error!( "Subscribe error: {}" , err) ;
1167- } ) ?;
1168-
1169- tracing:: debug!(
1170- request_id = %request_id,
1171- transaction_id = %op_id,
1172- operation = "subscribe" ,
1173- "Request-Transaction correlation"
1174- ) ;
1161+ // Legacy mode: generate transaction, register first, then run op
1162+ let tx = crate :: message:: Transaction :: new :: <
1163+ crate :: operations:: subscribe:: SubscribeMsg ,
1164+ > ( ) ;
11751165
11761166 op_manager
11771167 . ch_outbound
1178- . waiting_for_transaction_result ( op_id , client_id, request_id)
1168+ . waiting_for_transaction_result ( tx , client_id, request_id)
11791169 . await
11801170 . inspect_err ( |err| {
11811171 tracing:: error!(
11821172 "Error waiting for transaction result: {}" ,
11831173 err
11841174 ) ;
11851175 } ) ?;
1176+
1177+ crate :: node:: subscribe_with_id ( op_manager. clone ( ) , key, None , Some ( tx) )
1178+ . await
1179+ . inspect_err ( |err| {
1180+ tracing:: error!( "Subscribe error: {}" , err) ;
1181+ } ) ?;
1182+
1183+ tracing:: debug!(
1184+ request_id = %request_id,
1185+ transaction_id = %tx,
1186+ operation = "subscribe" ,
1187+ "Request-Transaction correlation"
1188+ ) ;
11861189 }
11871190 }
11881191 _ => {
0 commit comments