Skip to content

Commit f12ecf5

Browse files
committed
test: retry flaky three-hop PUT path
1 parent d302f36 commit f12ecf5

File tree

1 file changed

+87
-46
lines changed

1 file changed

+87
-46
lines changed

crates/core/tests/operations.rs

Lines changed: 87 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{anyhow, bail};
1+
use anyhow::{anyhow, bail, ensure};
22
use freenet::{
33
config::{ConfigArgs, InlineGwConfig, NetworkArgs, SecretArgs, WebsocketApiArgs},
44
dev_tool::TransportKeypair,
@@ -128,6 +128,86 @@ async fn get_contract(
128128
}
129129
}
130130

131+
async fn send_put_with_retry(
132+
client: &mut WebApi,
133+
state: WrappedState,
134+
contract: ContractContainer,
135+
description: &str,
136+
expected_key: Option<ContractKey>,
137+
) -> anyhow::Result<()> {
138+
const MAX_ATTEMPTS: usize = 3;
139+
for attempt in 1..=MAX_ATTEMPTS {
140+
tracing::info!("Sending {} (attempt {attempt}/{MAX_ATTEMPTS})", description);
141+
142+
make_put(client, state.clone(), contract.clone(), false).await?;
143+
144+
match tokio::time::timeout(Duration::from_secs(120), client.recv()).await {
145+
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
146+
if let Some(expected) = expected_key {
147+
ensure!(
148+
key == expected,
149+
"{} returned unexpected contract key (expected {}, got {})",
150+
description,
151+
expected,
152+
key
153+
);
154+
}
155+
tracing::info!("{description} succeeded on attempt {attempt}");
156+
return Ok(());
157+
}
158+
Ok(Ok(other)) => {
159+
tracing::warn!(
160+
"{} attempt {attempt} returned unexpected response: {:?}",
161+
description,
162+
other
163+
);
164+
}
165+
Ok(Err(e)) => {
166+
tracing::warn!(
167+
"{} attempt {attempt} failed while receiving response: {}",
168+
description,
169+
e
170+
);
171+
}
172+
Err(_) => {
173+
tracing::warn!(
174+
"{} attempt {attempt} timed out waiting for response",
175+
description
176+
);
177+
}
178+
}
179+
180+
if attempt == MAX_ATTEMPTS {
181+
bail!("{description} failed after {MAX_ATTEMPTS} attempts");
182+
}
183+
184+
// Drain any stray responses/errors before retrying to keep the client state clean.
185+
loop {
186+
match tokio::time::timeout(Duration::from_millis(200), client.recv()).await {
187+
Ok(Ok(resp)) => {
188+
tracing::warn!(
189+
"Discarding stray response prior to retrying {}: {:?}",
190+
description,
191+
resp
192+
);
193+
}
194+
Ok(Err(err)) => {
195+
tracing::warn!(
196+
"Discarding stray error prior to retrying {}: {}",
197+
description,
198+
err
199+
);
200+
}
201+
Err(_) => break,
202+
}
203+
}
204+
205+
tokio::time::sleep(Duration::from_secs(3)).await;
206+
}
207+
208+
unreachable!("send_put_with_retry loop should always return or bail");
209+
}
210+
131211
/// Test PUT operation across two peers (gateway and peer)
132212
#[freenet_test(
133213
nodes = ["gateway", "peer-a"],
@@ -443,34 +523,15 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
443523
let (stream, _) = connect_async(&uri).await?;
444524
let mut client_api_a = WebApi::start(stream);
445525

446-
// First PUT: Store initial contract state
447-
tracing::info!("Sending first PUT with initial state...");
448-
make_put(
526+
send_put_with_retry(
449527
&mut client_api_a,
450528
initial_wrapped_state.clone(),
451529
contract.clone(),
452-
false,
530+
"first PUT (cache seed)",
531+
Some(contract_key),
453532
)
454533
.await?;
455534

456-
// Wait for first put response
457-
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
458-
match resp {
459-
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
460-
tracing::info!("First PUT successful for contract: {}", key);
461-
assert_eq!(key, contract_key);
462-
}
463-
Ok(Ok(other)) => {
464-
bail!("Unexpected response for first PUT: {:?}", other);
465-
}
466-
Ok(Err(e)) => {
467-
bail!("Error receiving first PUT response: {}", e);
468-
}
469-
Err(_) => {
470-
bail!("Timeout waiting for first PUT response");
471-
}
472-
}
473-
474535
// Wait a bit to ensure state is fully cached
475536
tokio::time::sleep(Duration::from_secs(2)).await;
476537

@@ -498,35 +559,15 @@ async fn test_put_merge_persists_state(ctx: &mut TestContext) -> TestResult {
498559
updated_wrapped_state.as_ref().len()
499560
);
500561

501-
// Second PUT: Update the already-cached contract with new state
502-
// This tests the bug fix - the merged state should be persisted
503-
tracing::info!("Sending second PUT with updated state...");
504-
make_put(
562+
send_put_with_retry(
505563
&mut client_api_a,
506564
updated_wrapped_state.clone(),
507565
contract.clone(),
508-
false,
566+
"second PUT (merge)",
567+
Some(contract_key),
509568
)
510569
.await?;
511570

512-
// Wait for second put response
513-
let resp = tokio::time::timeout(Duration::from_secs(120), client_api_a.recv()).await;
514-
match resp {
515-
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
516-
tracing::info!("Second PUT successful for contract: {}", key);
517-
assert_eq!(key, contract_key);
518-
}
519-
Ok(Ok(other)) => {
520-
bail!("Unexpected response for second PUT: {:?}", other);
521-
}
522-
Ok(Err(e)) => {
523-
bail!("Error receiving second PUT response: {}", e);
524-
}
525-
Err(_) => {
526-
bail!("Timeout waiting for second PUT response");
527-
}
528-
}
529-
530571
// Wait a bit to ensure the merge and persistence completes
531572
tokio::time::sleep(Duration::from_secs(2)).await;
532573

0 commit comments

Comments
 (0)