From 89fa5ce00ca655ec239583d617abbb3a1381269b Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Mon, 13 Oct 2025 20:11:12 +0800 Subject: [PATCH 01/27] Stream snapshot downloads to disk directly. --- src/dfx/src/commands/canister/snapshot.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 107bf9c405..52b65919ad 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -23,6 +23,7 @@ use indicatif::HumanBytes; use itertools::Itertools; use slog::{debug, error, info}; use time::{OffsetDateTime, macros::format_description}; +use tokio::io::AsyncWriteExt; use crate::lib::{ environment::Environment, @@ -584,23 +585,22 @@ async fn store_data( BlobKind::StableMemory => "stable memory", }; - let blob = read_blob( + write_blob( env, canister, canister_id, snapshot_id, blob_kind, length, + &file_path, retry_policy.clone(), call_sender, ) .await .with_context(|| { - format!("Failed to read {message} from snapshot {snapshot_id} in canister {canister}") + format!("Failed to download {message} from snapshot {snapshot_id} in canister {canister}") })?; - std::fs::write(&file_path, &blob) - .with_context(|| format!("Failed to write {message} to '{}'", file_path.display()))?; debug!( env.get_logger(), "The {message} has been saved to '{}'", @@ -610,17 +610,19 @@ async fn store_data( Ok(()) } -async fn read_blob( +async fn write_blob( env: &dyn Environment, canister: &str, canister_id: Principal, snapshot: &SnapshotId, blob_kind: BlobKind, length: usize, + file_path: &PathBuf, retry_policy: ExponentialBackoff, call_sender: &CallSender, -) -> DfxResult> { - let mut blob: Vec = vec![0; length]; +) -> DfxResult { + let mut file = tokio::fs::File::create(file_path).await?; + let mut offset = 0; while offset < length { let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); @@ -659,11 +661,14 @@ async fn read_blob( }) .await? .chunk; - blob[offset..offset + chunk_size].copy_from_slice(&chunk); + file.write_all(&chunk).await?; + offset += chunk_size; } - Ok(blob) + file.flush().await?; + + Ok(()) } async fn upload_data( From af8d9ed42b6eafe578d2fbbd1e61d68ecb2d9b34 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Mon, 13 Oct 2025 22:06:02 +0800 Subject: [PATCH 02/27] Add progress bar for snapshot downloading. --- src/dfx/src/commands/canister/snapshot.rs | 25 ++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 52b65919ad..30ef309be9 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -19,7 +19,7 @@ use ic_management_canister_types::{ SnapshotDataOffset, UploadCanisterSnapshotDataArgs, UploadCanisterSnapshotMetadataArgs, UploadCanisterSnapshotMetadataResult, }; -use indicatif::HumanBytes; +use indicatif::{HumanBytes, ProgressStyle}; use itertools::Itertools; use slog::{debug, error, info}; use time::{OffsetDateTime, macros::format_description}; @@ -585,6 +585,8 @@ async fn store_data( BlobKind::StableMemory => "stable memory", }; + info!(env.get_logger(), "Downloading {message}"); + write_blob( env, canister, @@ -601,9 +603,9 @@ async fn store_data( format!("Failed to download {message} from snapshot {snapshot_id} in canister {canister}") })?; - debug!( + info!( env.get_logger(), - "The {message} has been saved to '{}'", + "\nThe {message} has been saved to '{}'", file_path.display() ); @@ -621,8 +623,10 @@ async fn write_blob( retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { - let mut file = tokio::fs::File::create(file_path).await?; + let pb = get_progress_bar(); + pb.set_length(length as u64); + let mut file = tokio::fs::File::create(file_path).await?; let mut offset = 0; while offset < length { let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); @@ -664,10 +668,12 @@ async fn write_blob( file.write_all(&chunk).await?; offset += chunk_size; + pb.set_position(offset as u64); } - file.flush().await?; + pb.finish(); + Ok(()) } @@ -769,3 +775,12 @@ fn is_retryable(error: &Error) -> bool { false } + +fn get_progress_bar() -> indicatif::ProgressBar { + let pb = indicatif::ProgressBar::new(0); + pb.set_style(ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})") + .expect("Failed to set template string") + .progress_chars("#>-")); + pb +} From f9b17cc3ce6549e952ad7ae1f9a2e620f7d094b6 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Tue, 14 Oct 2025 14:32:13 +0800 Subject: [PATCH 03/27] Stream snapshot uploading from disk directly. --- src/dfx/src/commands/canister/snapshot.rs | 44 ++++++++++++++--------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 30ef309be9..1ac1a0d216 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -23,7 +23,7 @@ use indicatif::{HumanBytes, ProgressStyle}; use itertools::Itertools; use slog::{debug, error, info}; use time::{OffsetDateTime, macros::format_description}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::lib::{ environment::Environment, @@ -644,12 +644,13 @@ async fn write_blob( size: chunk_size as u64, }, }; + + let data_args = ReadCanisterSnapshotDataArgs { + canister_id, + snapshot_id: snapshot.0.clone(), + kind, + }; let chunk = retry(retry_policy.clone(), || async { - let data_args = ReadCanisterSnapshotDataArgs { - canister_id, - snapshot_id: snapshot.0.clone(), - kind: kind.clone(), - }; match read_canister_snapshot_data(env, canister_id, &data_args, call_sender).await { Ok(chunk) => Ok(chunk), Err(error) if is_retryable(&error) => { @@ -692,15 +693,14 @@ async fn upload_data( BlobKind::MainMemory => "Wasm memory", BlobKind::StableMemory => "stable memory", }; - let blob = std::fs::read(&file_path) - .with_context(|| format!("Failed to read {message} from '{}'", file_path.display()))?; + upload_blob( env, canister, canister_id, snapshot_id, blob_kind, - blob, + &file_path, retry_policy.clone(), call_sender, ) @@ -722,11 +722,17 @@ async fn upload_blob( canister_id: Principal, snapshot: &SnapshotId, blob_kind: BlobKind, - data: Vec, + file_path: &PathBuf, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { - let length = data.len(); + let length = std::fs::metadata(file_path) + .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? + .len() as usize; + + let mut file = tokio::fs::File::open(file_path) + .await + .with_context(|| format!("Failed to open file '{}' for reading", file_path.display()))?; let mut offset = 0; while offset < length { let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); @@ -741,13 +747,17 @@ async fn upload_blob( offset: offset as u64, }, }; + + let mut chunk = vec![0u8; chunk_size]; + file.read_exact(&mut chunk).await?; + + let data_args = UploadCanisterSnapshotDataArgs { + canister_id, + snapshot_id: snapshot.0.clone(), + kind, + chunk, + }; retry(retry_policy.clone(), || async { - let data_args = UploadCanisterSnapshotDataArgs { - canister_id, - snapshot_id: snapshot.0.clone(), - kind: kind.clone(), - chunk: data[offset..offset + chunk_size].to_vec(), - }; match upload_canister_snapshot_data(env, canister_id, &data_args, call_sender).await { Ok(_) => Ok(()), Err(error) if is_retryable(&error) => { From 262b52a3664bd7ddfc672720489190b3792ecb12 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Tue, 14 Oct 2025 15:47:29 +0800 Subject: [PATCH 04/27] Add progress bar for snapshot uploading. --- src/dfx/src/commands/canister/snapshot.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 1ac1a0d216..5362706f16 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -694,6 +694,8 @@ async fn upload_data( BlobKind::StableMemory => "stable memory", }; + info!(env.get_logger(), "Uploading {message}"); + upload_blob( env, canister, @@ -708,9 +710,10 @@ async fn upload_data( .with_context(|| { format!("Failed to upload {message} to snapshot {snapshot_id} in canister {canister}") })?; - debug!( + info!( env.get_logger(), - "Snapshot {message} uploaded to canister {canister} with Snapshot ID: {snapshot_id}" + "The {message} has been uploaded from '{}'", + file_path.display() ); Ok(()) @@ -730,6 +733,9 @@ async fn upload_blob( .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? .len() as usize; + let pb = get_progress_bar(); + pb.set_length(length as u64); + let mut file = tokio::fs::File::open(file_path) .await .with_context(|| format!("Failed to open file '{}' for reading", file_path.display()))?; @@ -773,8 +779,11 @@ async fn upload_blob( }) .await?; offset += chunk_size; + pb.set_position(offset as u64); } + pb.finish(); + Ok(()) } From 0f6f44ad5aeb4e756b2e50576e283ab9b50e3c89 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Tue, 14 Oct 2025 17:14:39 +0800 Subject: [PATCH 05/27] Implement new_progress stub in env. --- src/dfx/src/commands/canister/snapshot.rs | 11 +++++------ src/dfx/src/lib/environment.rs | 17 +++++++++++------ src/dfx/src/lib/progress_bar.rs | 14 +++++++++++++- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 5362706f16..98691ac399 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -33,6 +33,7 @@ use crate::lib::{ load_canister_snapshot, read_canister_snapshot_data, read_canister_snapshot_metadata, take_canister_snapshot, upload_canister_snapshot_data, upload_canister_snapshot_metadata, }, + progress_bar::ProgressBar, retryable::retryable, root_key::fetch_root_key_if_needed, }; @@ -623,8 +624,7 @@ async fn write_blob( retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { - let pb = get_progress_bar(); - pb.set_length(length as u64); + let pb = get_progress_bar(env, length as u64); let mut file = tokio::fs::File::create(file_path).await?; let mut offset = 0; @@ -733,8 +733,7 @@ async fn upload_blob( .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? .len() as usize; - let pb = get_progress_bar(); - pb.set_length(length as u64); + let pb = get_progress_bar(env, length as u64); let mut file = tokio::fs::File::open(file_path) .await @@ -795,8 +794,8 @@ fn is_retryable(error: &Error) -> bool { false } -fn get_progress_bar() -> indicatif::ProgressBar { - let pb = indicatif::ProgressBar::new(0); +fn get_progress_bar(env: &dyn Environment, total_size: u64) -> ProgressBar { + let pb = env.new_progress(total_size); pb.set_style(ProgressStyle::default_bar() .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})") .expect("Failed to set template string") diff --git a/src/dfx/src/lib/environment.rs b/src/dfx/src/lib/environment.rs index b551926351..e647f56baf 100644 --- a/src/dfx/src/lib/environment.rs +++ b/src/dfx/src/lib/environment.rs @@ -60,7 +60,7 @@ pub trait Environment: Send + Sync { fn get_verbose_level(&self) -> i64; fn new_spinner(&self, message: Cow<'static, str>) -> ProgressBar; fn with_suspend_all_spinners(&self, f: Box); // box needed for dyn Environment - fn new_progress(&self, message: &str) -> ProgressBar; + fn new_progress(&self, total_size: u64) -> ProgressBar; fn new_identity_manager(&self) -> Result { IdentityManager::new( @@ -294,8 +294,13 @@ impl Environment for EnvironmentImpl { self.spinners.suspend(f); } - fn new_progress(&self, _message: &str) -> ProgressBar { - ProgressBar::discard() + fn new_progress(&self, total_size: u64) -> ProgressBar { + // Only show the progress bar if the level is INFO or more. + if self.verbose_level >= 0 { + ProgressBar::new_progress(total_size, &self.spinners) + } else { + ProgressBar::discard() + } } fn get_selected_identity(&self) -> Option<&String> { @@ -481,8 +486,8 @@ impl<'a> Environment for AgentEnvironment<'a> { self.backend.with_suspend_all_spinners(f); } - fn new_progress(&self, message: &str) -> ProgressBar { - self.backend.new_progress(message) + fn new_progress(&self, total_size: u64) -> ProgressBar { + self.backend.new_progress(total_size) } fn get_selected_identity(&self) -> Option<&String> { @@ -604,7 +609,7 @@ pub mod test_env { fn new_identity_manager(&self) -> Result { unimplemented!() } - fn new_progress(&self, _message: &str) -> ProgressBar { + fn new_progress(&self, _total_size: u64) -> ProgressBar { ProgressBar::discard() } fn with_suspend_all_spinners(&self, f: Box) { diff --git a/src/dfx/src/lib/progress_bar.rs b/src/dfx/src/lib/progress_bar.rs index ccb7fccd90..53eca86309 100644 --- a/src/dfx/src/lib/progress_bar.rs +++ b/src/dfx/src/lib/progress_bar.rs @@ -1,5 +1,5 @@ #![allow(clippy::disallowed_types)] -use indicatif::{MultiProgress, ProgressBar as IndicatifProgressBar}; +use indicatif::{MultiProgress, ProgressBar as IndicatifProgressBar, ProgressStyle}; use std::{borrow::Cow, time::Duration}; pub struct ProgressBar { @@ -36,8 +36,20 @@ impl ProgressBar { } } + pub fn new_progress(total_size: u64, set: &MultiProgress) -> Self { + let progress_bar = IndicatifProgressBar::new(total_size); + set.add(progress_bar.clone()); + + ProgressBar { + bar: Some(progress_bar), + } + } + + forward_fn_impl!(finish); forward_fn_impl!(finish_and_clear); forward_fn_impl!(set_message, message: Cow<'static, str>); + forward_fn_impl!(set_position, position: u64); + forward_fn_impl!(set_style, style: ProgressStyle); pub fn discard() -> Self { ProgressBar { bar: None } From 129985469115bf3bb61bb967807b6ec52e33eabf Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 15 Oct 2025 17:05:52 +0800 Subject: [PATCH 06/27] Change the log level. --- src/dfx/src/commands/canister/snapshot.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 98691ac399..e7dec83fc9 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -586,7 +586,7 @@ async fn store_data( BlobKind::StableMemory => "stable memory", }; - info!(env.get_logger(), "Downloading {message}"); + debug!(env.get_logger(), "Downloading {message}"); write_blob( env, @@ -604,9 +604,9 @@ async fn store_data( format!("Failed to download {message} from snapshot {snapshot_id} in canister {canister}") })?; - info!( + debug!( env.get_logger(), - "\nThe {message} has been saved to '{}'", + "The {message} has been saved to '{}'", file_path.display() ); @@ -694,7 +694,7 @@ async fn upload_data( BlobKind::StableMemory => "stable memory", }; - info!(env.get_logger(), "Uploading {message}"); + debug!(env.get_logger(), "Uploading {message}"); upload_blob( env, @@ -710,7 +710,7 @@ async fn upload_data( .with_context(|| { format!("Failed to upload {message} to snapshot {snapshot_id} in canister {canister}") })?; - info!( + debug!( env.get_logger(), "The {message} has been uploaded from '{}'", file_path.display() From c48795c7f82c49b0f273706b900c469b0b3d5408 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 15 Oct 2025 20:37:25 +0800 Subject: [PATCH 07/27] Support snapshot downloading with resuming. --- src/dfx/src/commands/canister/snapshot.rs | 100 ++++++++++++++++------ 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index e7dec83fc9..b71d3060b3 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -84,6 +84,9 @@ enum SnapshotSubcommand { /// The directory to download the snapshot to. #[arg(long, value_parser = directory_parser)] dir: PathBuf, + /// Whether to resume the download if the snapshot already exists. + #[arg(short, long, default_value = "false")] + resume: bool, }, /// Uploads a downloaded snapshot from a given directory to a canister. Upload { @@ -141,7 +144,8 @@ pub async fn exec( canister, snapshot, dir, - } => download(env, canister, snapshot, dir, call_sender).await?, + resume, + } => download(env, canister, snapshot, dir, resume, call_sender).await?, SnapshotSubcommand::Upload { canister, replace, @@ -275,31 +279,42 @@ async fn download( canister: String, snapshot: SnapshotId, dir: PathBuf, + resume: bool, call_sender: &CallSender, ) -> DfxResult { - check_dir(&dir)?; + if !resume { + check_dir(&dir)?; + } let canister_id = canister .parse() .or_else(|_| env.get_canister_id_store()?.get(&canister))?; // Store metadata. - let metadata_args = ReadCanisterSnapshotMetadataArgs { - canister_id, - snapshot_id: snapshot.0.clone(), - }; - let metadata = read_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) - .await - .with_context(|| { - format!("Failed to read metadata from snapshot {snapshot} in canister {canister}") - })?; let metadata_file = dir.join("metadata.json"); - save_json_file(&metadata_file, &metadata)?; - debug!( - env.get_logger(), - "Snapshot metadata saved to '{}'", - metadata_file.display() - ); + let metadata = if !metadata_file.exists() { + let metadata_args = ReadCanisterSnapshotMetadataArgs { + canister_id, + snapshot_id: snapshot.0.clone(), + }; + let metadata = + read_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) + .await + .with_context(|| { + format!( + "Failed to read metadata from snapshot {snapshot} in canister {canister}" + ) + })?; + save_json_file(&metadata_file, &metadata)?; + debug!( + env.get_logger(), + "Snapshot metadata saved to '{}'", + metadata_file.display() + ); + metadata + } else { + load_json_file(&metadata_file)? + }; let retry_policy = ExponentialBackoff::default(); @@ -313,6 +328,7 @@ async fn download( metadata.wasm_module_size as usize, dir.join("wasm_module.bin"), retry_policy.clone(), + resume, call_sender, ) .await?; @@ -327,6 +343,7 @@ async fn download( metadata.wasm_memory_size as usize, dir.join("wasm_memory.bin"), retry_policy.clone(), + resume, call_sender, ) .await?; @@ -342,6 +359,7 @@ async fn download( metadata.stable_memory_size as usize, dir.join("stable_memory.bin"), retry_policy.clone(), + resume, call_sender, ) .await?; @@ -350,16 +368,21 @@ async fn download( // Store Wasm chunks. if !metadata.wasm_chunk_store.is_empty() { let wasm_chunk_store_dir = dir.join("wasm_chunk_store"); - std::fs::create_dir(&wasm_chunk_store_dir).with_context(|| { - format!( - "Failed to create directory '{}'", - wasm_chunk_store_dir.display() - ) - })?; + if !wasm_chunk_store_dir.exists() { + std::fs::create_dir(&wasm_chunk_store_dir).with_context(|| { + format!( + "Failed to create directory '{}'", + wasm_chunk_store_dir.display() + ) + })?; + } for chunk_hash in metadata.wasm_chunk_store { let hash_str = hex::encode(&chunk_hash.hash); let chunk_file = wasm_chunk_store_dir.join(format!("{hash_str}.bin")); + if chunk_file.exists() { + continue; + } let chunk = retry(retry_policy.clone(), || async { let data_args = ReadCanisterSnapshotDataArgs { @@ -578,6 +601,7 @@ async fn store_data( length: usize, file_path: PathBuf, retry_policy: ExponentialBackoff, + resume: bool, call_sender: &CallSender, ) -> DfxResult { let message = match blob_kind { @@ -597,6 +621,7 @@ async fn store_data( length, &file_path, retry_policy.clone(), + resume, call_sender, ) .await @@ -622,12 +647,37 @@ async fn write_blob( length: usize, file_path: &PathBuf, retry_policy: ExponentialBackoff, + resume: bool, call_sender: &CallSender, ) -> DfxResult { + let mut offset = 0; + if resume && file_path.exists() { + let file_length = std::fs::metadata(file_path) + .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? + .len() as usize; + if file_length >= length { + return Ok(()); + } + offset = file_length; + + debug!( + env.get_logger(), + "Resuming download '{}' from offset {}", + file_path.display(), + offset + ); + } + let pb = get_progress_bar(env, length as u64); + pb.set_position(offset as u64); + + // Create file if it doesn't exist, otherwise open it in append mode. + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(file_path) + .await?; - let mut file = tokio::fs::File::create(file_path).await?; - let mut offset = 0; while offset < length { let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); let kind = match blob_kind { From c484d67d68adce178f6fa93e55bedb978d6e1f77 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 15 Oct 2025 22:30:53 +0800 Subject: [PATCH 08/27] Add snapshot download concurrency. --- src/dfx/src/commands/canister/snapshot.rs | 140 +++++++++++++++------- 1 file changed, 98 insertions(+), 42 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index b71d3060b3..ad91a2d2f9 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -1,4 +1,5 @@ use std::{ + collections::BTreeMap, fmt::{self, Display, Formatter}, path::PathBuf, str::FromStr, @@ -13,6 +14,7 @@ use dfx_core::{ identity::CallSender, json::{load_json_file, save_json_file}, }; +use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use ic_management_canister_types::{ CanisterStatusType, LoadCanisterSnapshotArgs, ReadCanisterSnapshotDataArgs, ReadCanisterSnapshotMetadataArgs, ReadCanisterSnapshotMetadataResult, SnapshotDataKind, @@ -87,6 +89,9 @@ enum SnapshotSubcommand { /// Whether to resume the download if the snapshot already exists. #[arg(short, long, default_value = "false")] resume: bool, + /// The number of concurrent downloads to perform. + #[arg(long, default_value = "3")] + concurrency: usize, }, /// Uploads a downloaded snapshot from a given directory to a canister. Upload { @@ -145,7 +150,19 @@ pub async fn exec( snapshot, dir, resume, - } => download(env, canister, snapshot, dir, resume, call_sender).await?, + concurrency, + } => { + download( + env, + canister, + snapshot, + dir, + resume, + concurrency, + call_sender, + ) + .await? + } SnapshotSubcommand::Upload { canister, replace, @@ -280,6 +297,7 @@ async fn download( snapshot: SnapshotId, dir: PathBuf, resume: bool, + concurrency: usize, call_sender: &CallSender, ) -> DfxResult { if !resume { @@ -329,6 +347,7 @@ async fn download( dir.join("wasm_module.bin"), retry_policy.clone(), resume, + concurrency, call_sender, ) .await?; @@ -344,6 +363,7 @@ async fn download( dir.join("wasm_memory.bin"), retry_policy.clone(), resume, + concurrency, call_sender, ) .await?; @@ -360,6 +380,7 @@ async fn download( dir.join("stable_memory.bin"), retry_policy.clone(), resume, + concurrency, call_sender, ) .await?; @@ -583,7 +604,7 @@ fn check_dir(dir: &PathBuf) -> DfxResult { Ok(()) } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum BlobKind { WasmModule, MainMemory, @@ -602,6 +623,7 @@ async fn store_data( file_path: PathBuf, retry_policy: ExponentialBackoff, resume: bool, + concurrency: usize, call_sender: &CallSender, ) -> DfxResult { let message = match blob_kind { @@ -622,6 +644,7 @@ async fn store_data( &file_path, retry_policy.clone(), resume, + concurrency, call_sender, ) .await @@ -648,6 +671,7 @@ async fn write_blob( file_path: &PathBuf, retry_policy: ExponentialBackoff, resume: bool, + concurrency: usize, call_sender: &CallSender, ) -> DfxResult { let mut offset = 0; @@ -678,48 +702,80 @@ async fn write_blob( .open(file_path) .await?; - while offset < length { - let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); - let kind = match blob_kind { - BlobKind::WasmModule => SnapshotDataKind::WasmModule { - offset: offset as u64, - size: chunk_size as u64, - }, - BlobKind::MainMemory => SnapshotDataKind::WasmMemory { - offset: offset as u64, - size: chunk_size as u64, - }, - BlobKind::StableMemory => SnapshotDataKind::StableMemory { - offset: offset as u64, - size: chunk_size as u64, - }, - }; + let mut next_offset = offset; + let mut next_request_offset = offset; + let mut in_progress: FuturesUnordered<_> = FuturesUnordered::new(); + let mut ready_chunks: BTreeMap> = BTreeMap::new(); + + while next_offset < length { + // Schedule next chunk to download. + // Also check the length of ready_chunks to determine if we should schedule next chunk. + // For example, if the first trunk is blocked, continue to schedule trunks will increase the memory usage. + while next_request_offset < length + && in_progress.len() < concurrency + && ready_chunks.len() < concurrency * 2 + { + let chunk_size = std::cmp::min(length - next_request_offset, MAX_CHUNK_SIZE); + let kind = match blob_kind { + BlobKind::WasmModule => SnapshotDataKind::WasmModule { + offset: next_request_offset as u64, + size: chunk_size as u64, + }, + BlobKind::MainMemory => SnapshotDataKind::WasmMemory { + offset: next_request_offset as u64, + size: chunk_size as u64, + }, + BlobKind::StableMemory => SnapshotDataKind::StableMemory { + offset: next_request_offset as u64, + size: chunk_size as u64, + }, + }; + let data_args = ReadCanisterSnapshotDataArgs { + canister_id, + snapshot_id: snapshot.0.clone(), + kind, + }; + let retry_policy = retry_policy.clone(); + + // Download chunk. + in_progress.push(async move { + let chunk = retry(retry_policy, || async { + match read_canister_snapshot_data(env, canister_id, &data_args, call_sender) + .await + { + Ok(chunk) => Ok(chunk), + Err(error) if is_retryable(&error) => { + error!( + env.get_logger(), + "Failed to read {:?} from snapshot {snapshot} in canister {canister}.", + blob_kind, + ); + Err(backoff::Error::transient(anyhow!(error))) + } + Err(error) => Err(backoff::Error::permanent(anyhow!(error))), + } + }) + .await? + .chunk; + + Ok::<(usize, Vec), anyhow::Error>((next_request_offset, chunk)) + }); + + next_request_offset += chunk_size; + } - let data_args = ReadCanisterSnapshotDataArgs { - canister_id, - snapshot_id: snapshot.0.clone(), - kind, - }; - let chunk = retry(retry_policy.clone(), || async { - match read_canister_snapshot_data(env, canister_id, &data_args, call_sender).await { - Ok(chunk) => Ok(chunk), - Err(error) if is_retryable(&error) => { - error!( - env.get_logger(), - "Failed to read {:?} from snapshot {snapshot} in canister {canister}.", - blob_kind, - ); - Err(backoff::Error::transient(anyhow!(error))) - } - Err(error) => Err(backoff::Error::permanent(anyhow!(error))), - } - }) - .await? - .chunk; - file.write_all(&chunk).await?; + // Process completed chunks. + while let Some(Some(res)) = in_progress.next().now_or_never() { + let (chunk_offset, chunk) = res?; + ready_chunks.insert(chunk_offset, chunk); + } - offset += chunk_size; - pb.set_position(offset as u64); + // Write completed chunks in order. + while let Some(chunk) = ready_chunks.remove(&next_offset) { + file.write_all(&chunk).await?; + next_offset += chunk.len(); + pb.set_position(next_offset as u64); + } } file.flush().await?; From c2d54642974638d45453b4ced3166a9e3b99bd26 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Thu, 16 Oct 2025 20:28:38 +0800 Subject: [PATCH 09/27] Support snapshot uploading with resuming. --- src/dfx/src/commands/canister/snapshot.rs | 285 ++++++++++++++++------ 1 file changed, 204 insertions(+), 81 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index ad91a2d2f9..c471224c97 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -1,7 +1,8 @@ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, fmt::{self, Display, Formatter}, - path::PathBuf, + io::SeekFrom, + path::{Path, PathBuf}, str::FromStr, }; @@ -23,9 +24,10 @@ use ic_management_canister_types::{ }; use indicatif::{HumanBytes, ProgressStyle}; use itertools::Itertools; +use serde::{Deserialize, Serialize}; use slog::{debug, error, info}; use time::{OffsetDateTime, macros::format_description}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use crate::lib::{ environment::Environment, @@ -103,10 +105,13 @@ enum SnapshotSubcommand { /// The directory to upload the snapshot from. #[arg(long, value_parser = directory_parser)] dir: PathBuf, + /// The snapshot ID to resume the upload to. + #[arg(short, long)] + resume: Option, }, } -#[derive(Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct SnapshotId(Vec); impl Display for SnapshotId { @@ -167,7 +172,8 @@ pub async fn exec( canister, replace, dir, - } => upload(env, canister, replace, dir, call_sender).await?, + resume, + } => upload(env, canister, replace, dir, resume, call_sender).await?, } Ok(()) } @@ -455,76 +461,117 @@ async fn upload( canister: String, replace: Option, dir: PathBuf, + resume: Option, call_sender: &CallSender, ) -> DfxResult { let canister_id = canister .parse() .or_else(|_| env.get_canister_id_store()?.get(&canister))?; - // Upload snapshot metadata. + // Load the upload progress if provided. + let mut upload_progress = SnapshotUploadProgress::new(); + if let Some(resume_snapshot_id) = &resume { + let progress_path = dir.join(format!("{resume_snapshot_id}.json")); + if progress_path.exists() { + upload_progress = load_json_file(&progress_path)?; + } else { + bail!( + "Cannot resume uploading to snapshot {} because the progress file was not found in directory '{}'", + resume_snapshot_id, + dir.display() + ); + } + } + let metadata: ReadCanisterSnapshotMetadataResult = load_json_file(&dir.join("metadata.json"))?; - let metadata_args = UploadCanisterSnapshotMetadataArgs { - canister_id, - replace_snapshot: replace.as_ref().map(|x| x.0.clone()), - wasm_module_size: metadata.wasm_module_size, - globals: metadata.globals, - wasm_memory_size: metadata.wasm_memory_size, - stable_memory_size: metadata.stable_memory_size, - certified_data: metadata.certified_data, - global_timer: metadata.global_timer, - on_low_wasm_memory_hook_status: metadata.on_low_wasm_memory_hook_status, + let snapshot_id = if !upload_progress.metadata_uploaded { + // Upload snapshot metadata. + let metadata_args = UploadCanisterSnapshotMetadataArgs { + canister_id, + replace_snapshot: replace.as_ref().map(|x| x.0.clone()), + wasm_module_size: metadata.wasm_module_size, + globals: metadata.globals, + wasm_memory_size: metadata.wasm_memory_size, + stable_memory_size: metadata.stable_memory_size, + certified_data: metadata.certified_data, + global_timer: metadata.global_timer, + on_low_wasm_memory_hook_status: metadata.on_low_wasm_memory_hook_status, + }; + let snapshot_id: SnapshotId = + upload_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) + .await + .with_context(|| { + format!("Failed to upload snapshot metadata to canister {canister}") + })? + .into(); + upload_progress.snapshot_id = snapshot_id.to_string(); + upload_progress.metadata_uploaded = true; + + debug!( + env.get_logger(), + "Snapshot metadata uploaded to canister {canister} with Snapshot ID: {snapshot_id}", + ); + snapshot_id + } else { + SnapshotId::from_str(&upload_progress.snapshot_id)? }; - let snapshot_id = - upload_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) - .await - .with_context(|| format!("Failed to upload snapshot metadata to canister {canister}"))? - .into(); - debug!( - env.get_logger(), - "Snapshot metadata uploaded to canister {canister} with Snapshot ID: {snapshot_id}" - ); let retry_policy = ExponentialBackoff::default(); // Upload Wasm module. - upload_data( - env, - &canister, - canister_id, - &snapshot_id, - BlobKind::WasmModule, - dir.join("wasm_module.bin"), - retry_policy.clone(), - call_sender, - ) - .await?; + write_upload_progress_file_on_error( + upload_data( + env, + &canister, + canister_id, + &snapshot_id, + BlobKind::WasmModule, + dir.join("wasm_module.bin"), + &mut upload_progress, + retry_policy.clone(), + call_sender, + ) + .await, + &upload_progress, + &dir, + )?; // Upload Wasm memory. - upload_data( - env, - &canister, - canister_id, - &snapshot_id, - BlobKind::MainMemory, - dir.join("wasm_memory.bin"), - retry_policy.clone(), - call_sender, - ) - .await?; - - // Upload stable memory. - if metadata.stable_memory_size > 0 { + write_upload_progress_file_on_error( upload_data( env, &canister, canister_id, &snapshot_id, - BlobKind::StableMemory, - dir.join("stable_memory.bin"), + BlobKind::MainMemory, + dir.join("wasm_memory.bin"), + &mut upload_progress, retry_policy.clone(), call_sender, ) - .await?; + .await, + &upload_progress, + &dir, + )?; + + // Upload stable memory. + if metadata.stable_memory_size > 0 { + write_upload_progress_file_on_error( + upload_data( + env, + &canister, + canister_id, + &snapshot_id, + BlobKind::StableMemory, + dir.join("stable_memory.bin"), + &mut upload_progress, + retry_policy.clone(), + call_sender, + ) + .await, + &upload_progress, + &dir, + )?; } // Upload Wasm chunks. @@ -532,33 +579,49 @@ async fn upload( let wasm_chunk_store_dir = dir.join("wasm_chunk_store"); for chunk_hash in metadata.wasm_chunk_store { let hash_str = hex::encode(&chunk_hash.hash); + if *upload_progress + .wasm_chunks_uploaded + .get(&hash_str) + .unwrap_or(&false) + { + // Skip the chunk if it has already been uploaded. + continue; + } + let chunk_file = wasm_chunk_store_dir.join(format!("{hash_str}.bin")); let chunk_data = std::fs::read(&chunk_file).with_context(|| { format!("Failed to read Wasm chunk from '{}'", chunk_file.display()) })?; - retry(retry_policy.clone(), || async { - let data_args = UploadCanisterSnapshotDataArgs { - canister_id, - snapshot_id: snapshot_id.0.clone(), - kind: SnapshotDataOffset::WasmChunk, - chunk: chunk_data.clone(), - }; - match upload_canister_snapshot_data( - env, - canister_id, - &data_args, - call_sender, - ) - .await - { - Ok(_) => Ok(()), - Err(_error) => Err(backoff::Error::transient(anyhow!( - "Failed to upload Wasm chunk {hash_str} to snapshot {snapshot_id} in canister {canister}" - ))), - } - }) - .await?; + write_upload_progress_file_on_error( + retry(retry_policy.clone(), || async { + let data_args = UploadCanisterSnapshotDataArgs { + canister_id, + snapshot_id: snapshot_id.0.clone(), + kind: SnapshotDataOffset::WasmChunk, + chunk: chunk_data.clone(), + }; + match upload_canister_snapshot_data( + env, + canister_id, + &data_args, + call_sender, + ) + .await + { + Ok(_) => Ok(()), + Err(_error) => Err(backoff::Error::transient(anyhow!( + "Failed to upload Wasm chunk {hash_str} to snapshot {snapshot_id} in canister {canister}" + ))), + } + }) + .await, + &upload_progress, + &dir, + )?; + + upload_progress.wasm_chunks_uploaded.insert(hash_str, true); + debug!( env.get_logger(), "Snapshot Wasm chunk {} uploaded to canister {} with Snapshot ID: {}", @@ -574,9 +637,29 @@ async fn upload( "Uploaded a snapshot of canister {canister}. Snapshot ID: {}", snapshot_id ); + // Save the upload progress. + save_json_file( + &dir.join(format!("{}.json", upload_progress.snapshot_id)), + &upload_progress, + )?; + Ok(()) } +fn write_upload_progress_file_on_error( + result: DfxResult, + upload_progress: &SnapshotUploadProgress, + dir: &Path, +) -> DfxResult { + if result.is_err() { + save_json_file( + &dir.join(format!("{}.json", upload_progress.snapshot_id)), + &upload_progress, + )?; + } + result +} + fn check_dir(dir: &PathBuf) -> DfxResult { // Check if the directory is empty. let mut entries = std::fs::read_dir(dir) @@ -611,6 +694,29 @@ enum BlobKind { StableMemory, } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SnapshotUploadProgress { + snapshot_id: String, + metadata_uploaded: bool, + wasm_module_progress: usize, + wasm_memory_progress: usize, + stable_memory_progress: usize, + wasm_chunks_uploaded: HashMap, +} + +impl SnapshotUploadProgress { + fn new() -> Self { + Self { + snapshot_id: String::new(), + metadata_uploaded: false, + wasm_module_progress: 0, + wasm_memory_progress: 0, + stable_memory_progress: 0, + wasm_chunks_uploaded: HashMap::new(), + } + } +} + const MAX_CHUNK_SIZE: usize = 2_000_000; async fn store_data( @@ -686,7 +792,7 @@ async fn write_blob( debug!( env.get_logger(), - "Resuming download '{}' from offset {}", + "Resuming downloading '{}' from offset {}", file_path.display(), offset ); @@ -791,13 +897,14 @@ async fn upload_data( snapshot_id: &SnapshotId, blob_kind: BlobKind, file_path: PathBuf, + upload_progress: &mut SnapshotUploadProgress, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { - let message = match blob_kind { - BlobKind::WasmModule => "Wasm module", - BlobKind::MainMemory => "Wasm memory", - BlobKind::StableMemory => "stable memory", + let (message, progress) = match blob_kind { + BlobKind::WasmModule => ("Wasm module", &mut upload_progress.wasm_module_progress), + BlobKind::MainMemory => ("Wasm memory", &mut upload_progress.wasm_memory_progress), + BlobKind::StableMemory => ("stable memory", &mut upload_progress.stable_memory_progress), }; debug!(env.get_logger(), "Uploading {message}"); @@ -809,6 +916,7 @@ async fn upload_data( snapshot_id, blob_kind, &file_path, + progress, retry_policy.clone(), call_sender, ) @@ -832,6 +940,7 @@ async fn upload_blob( snapshot: &SnapshotId, blob_kind: BlobKind, file_path: &PathBuf, + upload_progress: &mut usize, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { @@ -839,12 +948,24 @@ async fn upload_blob( .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? .len() as usize; + let mut offset = *upload_progress; + if offset > 0 && offset < length { + debug!( + env.get_logger(), + "Resuming uploading '{}' from offset {}", + file_path.display(), + offset + ); + } + let pb = get_progress_bar(env, length as u64); + pb.set_position(offset as u64); let mut file = tokio::fs::File::open(file_path) .await .with_context(|| format!("Failed to open file '{}' for reading", file_path.display()))?; - let mut offset = 0; + file.seek(SeekFrom::Start(offset as u64)).await?; + while offset < length { let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); let kind = match blob_kind { @@ -885,6 +1006,8 @@ async fn upload_blob( .await?; offset += chunk_size; pb.set_position(offset as u64); + + *upload_progress = offset; } pb.finish(); From 8caca0db837d35e5e8e7b2e78cad6fc011d76f21 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Thu, 16 Oct 2025 21:54:02 +0800 Subject: [PATCH 10/27] Add snapshot upload concurrency --- src/dfx/src/commands/canister/snapshot.rs | 163 +++++++++++++++------- 1 file changed, 110 insertions(+), 53 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index c471224c97..b844e14e0c 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -108,6 +108,9 @@ enum SnapshotSubcommand { /// The snapshot ID to resume the upload to. #[arg(short, long)] resume: Option, + /// The number of concurrent uploads to perform. + #[arg(long, default_value = "3")] + concurrency: usize, }, } @@ -173,7 +176,19 @@ pub async fn exec( replace, dir, resume, - } => upload(env, canister, replace, dir, resume, call_sender).await?, + concurrency, + } => { + upload( + env, + canister, + replace, + dir, + resume, + concurrency, + call_sender, + ) + .await? + } } Ok(()) } @@ -351,9 +366,9 @@ async fn download( BlobKind::WasmModule, metadata.wasm_module_size as usize, dir.join("wasm_module.bin"), - retry_policy.clone(), resume, concurrency, + retry_policy.clone(), call_sender, ) .await?; @@ -367,9 +382,9 @@ async fn download( BlobKind::MainMemory, metadata.wasm_memory_size as usize, dir.join("wasm_memory.bin"), - retry_policy.clone(), resume, concurrency, + retry_policy.clone(), call_sender, ) .await?; @@ -384,9 +399,9 @@ async fn download( BlobKind::StableMemory, metadata.stable_memory_size as usize, dir.join("stable_memory.bin"), - retry_policy.clone(), resume, concurrency, + retry_policy.clone(), call_sender, ) .await?; @@ -462,6 +477,7 @@ async fn upload( replace: Option, dir: PathBuf, resume: Option, + concurrency: usize, call_sender: &CallSender, ) -> DfxResult { let canister_id = canister @@ -528,6 +544,7 @@ async fn upload( BlobKind::WasmModule, dir.join("wasm_module.bin"), &mut upload_progress, + concurrency, retry_policy.clone(), call_sender, ) @@ -546,6 +563,7 @@ async fn upload( BlobKind::MainMemory, dir.join("wasm_memory.bin"), &mut upload_progress, + concurrency, retry_policy.clone(), call_sender, ) @@ -565,6 +583,7 @@ async fn upload( BlobKind::StableMemory, dir.join("stable_memory.bin"), &mut upload_progress, + concurrency, retry_policy.clone(), call_sender, ) @@ -717,7 +736,7 @@ impl SnapshotUploadProgress { } } -const MAX_CHUNK_SIZE: usize = 2_000_000; +const MAX_CHUNK_SIZE: usize = 200_000; async fn store_data( env: &dyn Environment, @@ -727,9 +746,9 @@ async fn store_data( blob_kind: BlobKind, length: usize, file_path: PathBuf, - retry_policy: ExponentialBackoff, resume: bool, concurrency: usize, + retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { let message = match blob_kind { @@ -815,8 +834,6 @@ async fn write_blob( while next_offset < length { // Schedule next chunk to download. - // Also check the length of ready_chunks to determine if we should schedule next chunk. - // For example, if the first trunk is blocked, continue to schedule trunks will increase the memory usage. while next_request_offset < length && in_progress.len() < concurrency && ready_chunks.len() < concurrency * 2 @@ -898,6 +915,7 @@ async fn upload_data( blob_kind: BlobKind, file_path: PathBuf, upload_progress: &mut SnapshotUploadProgress, + concurrency: usize, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { @@ -917,6 +935,7 @@ async fn upload_data( blob_kind, &file_path, progress, + concurrency, retry_policy.clone(), call_sender, ) @@ -941,6 +960,7 @@ async fn upload_blob( blob_kind: BlobKind, file_path: &PathBuf, upload_progress: &mut usize, + concurrency: usize, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { @@ -948,66 +968,103 @@ async fn upload_blob( .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? .len() as usize; - let mut offset = *upload_progress; - if offset > 0 && offset < length { + let offest = *upload_progress; + if offest >= length { + return Ok(()); + } + if offest > 0 { debug!( env.get_logger(), "Resuming uploading '{}' from offset {}", file_path.display(), - offset + offest ); } let pb = get_progress_bar(env, length as u64); - pb.set_position(offset as u64); + pb.set_position(offest as u64); + // Open the file once and read the needed chunks sequentially while overlapping network uploads. let mut file = tokio::fs::File::open(file_path) .await .with_context(|| format!("Failed to open file '{}' for reading", file_path.display()))?; - file.seek(SeekFrom::Start(offset as u64)).await?; - - while offset < length { - let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); - let kind = match blob_kind { - BlobKind::WasmModule => SnapshotDataOffset::WasmModule { - offset: offset as u64, - }, - BlobKind::MainMemory => SnapshotDataOffset::WasmMemory { - offset: offset as u64, - }, - BlobKind::StableMemory => SnapshotDataOffset::StableMemory { - offset: offset as u64, - }, - }; - let mut chunk = vec![0u8; chunk_size]; - file.read_exact(&mut chunk).await?; + let mut next_offset = offest; + let mut next_request_offset = offest; + let mut in_progress: FuturesUnordered<_> = FuturesUnordered::new(); + let mut completed: BTreeMap = BTreeMap::new(); - let data_args = UploadCanisterSnapshotDataArgs { - canister_id, - snapshot_id: snapshot.0.clone(), - kind, - chunk, - }; - retry(retry_policy.clone(), || async { - match upload_canister_snapshot_data(env, canister_id, &data_args, call_sender).await { - Ok(_) => Ok(()), - Err(error) if is_retryable(&error) => { - error!( - env.get_logger(), - "Failed to upload {:?} to snapshot {snapshot} in canister {canister}.", - blob_kind, - ); - Err(backoff::Error::transient(anyhow!(error))) - } - Err(error) => Err(backoff::Error::permanent(anyhow!(error))), - } - }) - .await?; - offset += chunk_size; - pb.set_position(offset as u64); + while next_offset < length { + // Schedule next chunk to upload. + while next_request_offset < length + && in_progress.len() < concurrency + && completed.len() < concurrency * 2 + { + let chunk_size = std::cmp::min(length - next_request_offset, MAX_CHUNK_SIZE); + let kind = match blob_kind { + BlobKind::WasmModule => SnapshotDataOffset::WasmModule { + offset: next_request_offset as u64, + }, + BlobKind::MainMemory => SnapshotDataOffset::WasmMemory { + offset: next_request_offset as u64, + }, + BlobKind::StableMemory => SnapshotDataOffset::StableMemory { + offset: next_request_offset as u64, + }, + }; + + // Read the bytes for this chunk. + let mut chunk = vec![0u8; chunk_size]; + file.seek(SeekFrom::Start(next_request_offset as u64)) + .await?; + file.read_exact(&mut chunk).await?; - *upload_progress = offset; + let data_args = UploadCanisterSnapshotDataArgs { + canister_id, + snapshot_id: snapshot.0.clone(), + kind, + chunk, + }; + let retry_policy = retry_policy.clone(); + + // Start upload for this chunk. + in_progress.push(async move { + retry(retry_policy, || async { + match upload_canister_snapshot_data(env, canister_id, &data_args, call_sender) + .await + { + Ok(()) => Ok(()), + Err(error) if is_retryable(&error) => { + error!( + env.get_logger(), + "Failed to upload {:?} to snapshot {snapshot} in canister {canister}.", + blob_kind, + ); + Err(backoff::Error::transient(anyhow!(error))) + } + Err(error) => Err(backoff::Error::permanent(anyhow!(error))), + } + }) + .await?; + + Ok::<(usize, usize), anyhow::Error>((next_request_offset, chunk_size)) + }); + + next_request_offset += chunk_size; + } + + // Collect finished uploads without awaiting when none are ready. + while let Some(Some(res)) = in_progress.next().now_or_never() { + let (uploaded_offset, uploaded_size) = res?; + completed.insert(uploaded_offset, uploaded_size); + } + + // Advance contiguous progress and update the progress bar. + while let Some(size) = completed.remove(&next_offset) { + next_offset += size; + pb.set_position(next_offset as u64); + *upload_progress = next_offset; + } } pb.finish(); From 7d94221b1d3101d4c87ec510d84d04092e6543da Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Thu, 16 Oct 2025 21:57:18 +0800 Subject: [PATCH 11/27] Revert the testing chunk size. --- src/dfx/src/commands/canister/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index b844e14e0c..f7cfca5b9e 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -736,7 +736,7 @@ impl SnapshotUploadProgress { } } -const MAX_CHUNK_SIZE: usize = 200_000; +const MAX_CHUNK_SIZE: usize = 2_000_000; async fn store_data( env: &dyn Environment, From ddd984ffab15c7c732622e50102b56e5a42f38be Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Thu, 16 Oct 2025 22:19:43 +0800 Subject: [PATCH 12/27] Update changelog and document. --- CHANGELOG.md | 8 ++++++++ docs/cli-reference/dfx-canister.mdx | 24 +++++++++++++---------- src/dfx/src/commands/canister/snapshot.rs | 4 ++-- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a6f1bb423..1397f684ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ # UNRELEASED +### feat: improved the canister snapshot download/upload feature + +Improved the canister snapshot download/upload feature by +- adding progress bars to snapshot download/upload +- streaming snapshot download/upload directly to/from disk. +- supporting download/upload with resuming. +- supporting download/upload with concurrency, default to 3 tasks in parallel. + # 0.30.0 ### feat: `dfx start --system-canisters` for bootstrapping system canisters diff --git a/docs/cli-reference/dfx-canister.mdx b/docs/cli-reference/dfx-canister.mdx index 8597305d53..2033b71f1d 100644 --- a/docs/cli-reference/dfx-canister.mdx +++ b/docs/cli-reference/dfx-canister.mdx @@ -1028,11 +1028,13 @@ dfx canister snapshot download --dir You can use the following arguments with the `dfx canister snapshot download` command. -| Argument | Description | -|---------------|----------------------------------------------------------------------------| -| `` | The canister to download the snapshot from. | -| `` | The ID of the snapshot to download. | -| --dir `` | The directory to download the snapshot to. It should be created and empty. | +| Argument | Description | +|-------------------------------|--------------------------------------------------------------------------------------------| +| `` | The canister to download the snapshot from. | +| `` | The ID of the snapshot to download. | +| --dir `` | The directory to download the snapshot to. It should be created and empty if not resuming. | +| --resume | Whether to resume the download if the previous snapshot download failed. | +| --concurrency `` | The number of concurrent downloads to perform [default: 3]. | ### Examples @@ -1056,11 +1058,13 @@ dfx canister snapshot upload --dir You can use the following arguments with the `dfx canister snapshot upload` command. -| Argument | Description | -|-----------------------|--------------------------------------------------------------------------------| -| `` | The canister to upload the snapshot to. | -| --dir `` | The directory to upload the snapshot from. | -| --replace `` | If a snapshot ID is specified, the snapshot identified by this ID will be deleted and a snapshot with a new ID will be returned. | +| Argument | Description | +|-------------------------------|--------------------------------------------------------------------------------| +| `` | The canister to upload the snapshot to. | +| --dir `` | The directory to upload the snapshot from. | +| --replace `` | If a snapshot ID is specified, the snapshot identified by this ID will be deleted and a snapshot with a new ID will be returned. | +| --resume `` | The snapshot ID to resume uploading to. | +| --concurrency `` | The number of concurrent uploads to perform [default: 3]. | ### Examples diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index f7cfca5b9e..3b815fbc1a 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -88,7 +88,7 @@ enum SnapshotSubcommand { /// The directory to download the snapshot to. #[arg(long, value_parser = directory_parser)] dir: PathBuf, - /// Whether to resume the download if the snapshot already exists. + /// Whether to resume the download if the previous snapshot download failed. #[arg(short, long, default_value = "false")] resume: bool, /// The number of concurrent downloads to perform. @@ -105,7 +105,7 @@ enum SnapshotSubcommand { /// The directory to upload the snapshot from. #[arg(long, value_parser = directory_parser)] dir: PathBuf, - /// The snapshot ID to resume the upload to. + /// The snapshot ID to resume uploading to. #[arg(short, long)] resume: Option, /// The number of concurrent uploads to perform. From a083c518d5d687454fea8823a909086b827b3ea8 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Tue, 21 Oct 2025 12:48:56 +0800 Subject: [PATCH 13/27] Addressed review comments. --- src/dfx/src/commands/canister/snapshot.rs | 56 ++++++++++++++++------- 1 file changed, 40 insertions(+), 16 deletions(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 3b815fbc1a..e51968a3ca 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -680,7 +680,7 @@ fn write_upload_progress_file_on_error( } fn check_dir(dir: &PathBuf) -> DfxResult { - // Check if the directory is empty. + // Check if the directory is empty if not resuming. let mut entries = std::fs::read_dir(dir) .with_context(|| format!("Failed to read directory '{}'", dir.display()))?; if entries.next().is_some() { @@ -759,7 +759,7 @@ async fn store_data( debug!(env.get_logger(), "Downloading {message}"); - write_blob( + download_blob_to_file( env, canister, canister_id, @@ -786,7 +786,7 @@ async fn store_data( Ok(()) } -async fn write_blob( +async fn download_blob_to_file( env: &dyn Environment, canister: &str, canister_id: Principal, @@ -887,7 +887,7 @@ async fn write_blob( next_request_offset += chunk_size; } - // Process completed chunks. + // Process completed chunks if any are ready. while let Some(Some(res)) = in_progress.next().now_or_never() { let (chunk_offset, chunk) = res?; ready_chunks.insert(chunk_offset, chunk); @@ -900,6 +900,19 @@ async fn write_blob( pb.set_position(next_offset as u64); } } + + // Wait for any remaining downloads to complete. + while let Some(res) = in_progress.next().await { + let (chunk_offset, chunk) = res?; + ready_chunks.insert(chunk_offset, chunk); + + while let Some(chunk) = ready_chunks.remove(&next_offset) { + file.write_all(&chunk).await?; + next_offset += chunk.len(); + pb.set_position(next_offset as u64); + } + } + file.flush().await?; pb.finish(); @@ -968,29 +981,30 @@ async fn upload_blob( .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? .len() as usize; - let offest = *upload_progress; - if offest >= length { + let offset = *upload_progress; + if offset >= length { return Ok(()); } - if offest > 0 { + if offset > 0 { debug!( env.get_logger(), "Resuming uploading '{}' from offset {}", file_path.display(), - offest + offset ); } let pb = get_progress_bar(env, length as u64); - pb.set_position(offest as u64); + pb.set_position(offset as u64); - // Open the file once and read the needed chunks sequentially while overlapping network uploads. + // Open the file once and seek to the offset. let mut file = tokio::fs::File::open(file_path) .await .with_context(|| format!("Failed to open file '{}' for reading", file_path.display()))?; + file.seek(SeekFrom::Start(offset as u64)).await?; - let mut next_offset = offest; - let mut next_request_offset = offest; + let mut next_offset = offset; + let mut next_request_offset = offset; let mut in_progress: FuturesUnordered<_> = FuturesUnordered::new(); let mut completed: BTreeMap = BTreeMap::new(); @@ -1015,8 +1029,6 @@ async fn upload_blob( // Read the bytes for this chunk. let mut chunk = vec![0u8; chunk_size]; - file.seek(SeekFrom::Start(next_request_offset as u64)) - .await?; file.read_exact(&mut chunk).await?; let data_args = UploadCanisterSnapshotDataArgs { @@ -1053,13 +1065,25 @@ async fn upload_blob( next_request_offset += chunk_size; } - // Collect finished uploads without awaiting when none are ready. + // Process completed uploads if any are ready. while let Some(Some(res)) = in_progress.next().now_or_never() { let (uploaded_offset, uploaded_size) = res?; completed.insert(uploaded_offset, uploaded_size); } - // Advance contiguous progress and update the progress bar. + // Advance and update the progress bar. + while let Some(size) = completed.remove(&next_offset) { + next_offset += size; + pb.set_position(next_offset as u64); + *upload_progress = next_offset; + } + } + + // Wait for any remaining uploads to complete. + while let Some(res) = in_progress.next().await { + let (uploaded_offset, uploaded_size) = res?; + completed.insert(uploaded_offset, uploaded_size); + while let Some(size) = completed.remove(&next_offset) { next_offset += size; pb.set_position(next_offset as u64); From 874253442e7a9d9e3b9a92306ce416cd3b9fff20 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 22 Oct 2025 14:35:36 +0800 Subject: [PATCH 14/27] Added a test for download/upload with latency, disabled it for now. --- e2e/tests-dfx/canister_extra.bash | 40 +++++++++++++++ e2e/utils/toxiproxy.bash | 84 +++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 e2e/utils/toxiproxy.bash diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index a1cd13e29d..fd565491e3 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -1,6 +1,7 @@ #!/usr/bin/env bats load ../utils/_ +load ../utils/toxiproxy setup() { standard_setup @@ -105,6 +106,45 @@ teardown() { assert_contains '(1 : nat)' } +@test "canister snapshots download and upload via toxiproxy with high latency" { + skip "Still failed when running in the e2e tests, also need to update CI to install toxiproxy." + + # Start the dfx server on a random port. + dfx_port=$(get_ephemeral_port) + dfx_start --host "127.0.0.1:$dfx_port" + + # Start toxiproxy and create a proxy. + toxiproxy_start + toxiproxy_create_proxy dfx_proxy "127.0.0.1:4843" "127.0.0.1:$dfx_port" + + install_asset counter + dfx deploy --no-wallet --network "http://127.0.0.1:4843" + + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:4843" + assert_contains '(1 : nat)' + + # Create a snapshot to download. + dfx canister stop hello_backend --network "http://127.0.0.1:4843" + assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:4843" + assert_match 'Snapshot ID: ([0-9a-f]+)' + snapshot=${BASH_REMATCH[1]} + echo "snapshot: $snapshot" + + # Add latency to the proxy. + toxiproxy_add_latency dfx_proxy 1500 300 + + # Download through the proxy with latency. + OUTPUT_DIR="output" + mkdir -p "$OUTPUT_DIR" + assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network http://127.0.0.1:4843 + assert_contains "saved to '$OUTPUT_DIR'" + + assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:4843" + assert_match 'Snapshot ID: ([0-9a-f]+)' + + toxiproxy_stop || true +} + @test "can query a website" { dfx_start diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash new file mode 100644 index 0000000000..953d59024e --- /dev/null +++ b/e2e/utils/toxiproxy.bash @@ -0,0 +1,84 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Helpers for toxiproxy to use toxiproxy-server and toxiproxy-cli + +: "${TOXIPROXY_HOST:=127.0.0.1}" +: "${TOXIPROXY_PORT:=8474}" + +_toxiproxy_cli() { + toxiproxy-cli -h "$TOXIPROXY_HOST" -p "$TOXIPROXY_PORT" "$@" +} + +# Check if toxiproxy server is running +toxiproxy_is_running() { + curl --silent --fail "http://${TOXIPROXY_HOST}:${TOXIPROXY_PORT}/version" >/dev/null 2>&1 +} + +# Start toxiproxy server +toxiproxy_start() { + if toxiproxy_is_running; then + return 0 + fi + + if ! command -v toxiproxy-server >/dev/null 2>&1; then + echo "toxiproxy-server not found in PATH" >&2 + return 1 + fi + + toxiproxy-server -host "$TOXIPROXY_HOST" -port "$TOXIPROXY_PORT" >/dev/null 2>&1 & + export E2E_TOXIPROXY_PID=$! + + for _ in $(seq 1 50); do + if toxiproxy_is_running; then + return 0 + fi + sleep 0.1 + done + + echo "Toxiproxy server did not become available on ${TOXIPROXY_HOST}:${TOXIPROXY_PORT}" >&2 + return 1 +} + +# Stop toxiproxy server +toxiproxy_stop() { + if [ -n "${E2E_TOXIPROXY_PID:-}" ]; then + kill "$E2E_TOXIPROXY_PID" >/dev/null 2>&1 || true + unset E2E_TOXIPROXY_PID + fi +} + +# Create or replace a proxy +toxiproxy_create_proxy() { + local name=$1 listen=$2 upstream=$3 + _toxiproxy_cli delete "$name" >/dev/null 2>&1 || true + _toxiproxy_cli create "$name" -l "$listen" -u "$upstream" >/dev/null +} + +# Delete a proxy +toxiproxy_delete_proxy() { + local name=$1 + _toxiproxy_cli delete "$name" >/dev/null 2>&1 || true +} + +# Set a proxy to enabled or disabled +toxiproxy_set_enabled() { + local name=$1 enabled=$2 + if [ "$enabled" = "true" ]; then + _toxiproxy_cli enable "$name" >/dev/null + else + _toxiproxy_cli disable "$name" >/dev/null + fi +} + +# Add latency toxic (downstream) +toxiproxy_add_latency() { + local name=$1 latency=$2 jitter=${3:-0} + _toxiproxy_cli toxic add "$name" -t latency -a latency="$latency" -a jitter="$jitter" -d downstream >/dev/null +} + +# Add timeout toxic (downstream) +toxiproxy_add_timeout() { + local name=$1 timeout_ms=$2 + _toxiproxy_cli toxic add "$name" -t timeout -a timeout="$timeout_ms" -d downstream >/dev/null +} From 71f4625537adcb4c8ebd89995f1f92ea552a2453 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 22 Oct 2025 15:20:28 +0800 Subject: [PATCH 15/27] Make the test passed locally with toxiproxy used. --- e2e/tests-dfx/canister_extra.bash | 3 +-- e2e/utils/toxiproxy.bash | 24 +++++++++++++----------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index fd565491e3..9f07b6d370 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -107,7 +107,7 @@ teardown() { } @test "canister snapshots download and upload via toxiproxy with high latency" { - skip "Still failed when running in the e2e tests, also need to update CI to install toxiproxy." + skip "Need to update CI to install toxiproxy." # Start the dfx server on a random port. dfx_port=$(get_ephemeral_port) @@ -128,7 +128,6 @@ teardown() { assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:4843" assert_match 'Snapshot ID: ([0-9a-f]+)' snapshot=${BASH_REMATCH[1]} - echo "snapshot: $snapshot" # Add latency to the proxy. toxiproxy_add_latency dfx_proxy 1500 300 diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash index 953d59024e..2d00657b98 100644 --- a/e2e/utils/toxiproxy.bash +++ b/e2e/utils/toxiproxy.bash @@ -6,10 +6,6 @@ set -euo pipefail : "${TOXIPROXY_HOST:=127.0.0.1}" : "${TOXIPROXY_PORT:=8474}" -_toxiproxy_cli() { - toxiproxy-cli -h "$TOXIPROXY_HOST" -p "$TOXIPROXY_PORT" "$@" -} - # Check if toxiproxy server is running toxiproxy_is_running() { curl --silent --fail "http://${TOXIPROXY_HOST}:${TOXIPROXY_PORT}/version" >/dev/null 2>&1 @@ -51,34 +47,40 @@ toxiproxy_stop() { # Create or replace a proxy toxiproxy_create_proxy() { local name=$1 listen=$2 upstream=$3 - _toxiproxy_cli delete "$name" >/dev/null 2>&1 || true - _toxiproxy_cli create "$name" -l "$listen" -u "$upstream" >/dev/null + + # Ensure toxiproxy-cli is available + if ! command -v toxiproxy-cli >/dev/null 2>&1; then + echo "toxiproxy-cli not found in PATH" >&2 + return 1 + fi + + toxiproxy-cli create "$name" --listen "$listen" --upstream "$upstream" >/dev/null 2>&1 } # Delete a proxy toxiproxy_delete_proxy() { local name=$1 - _toxiproxy_cli delete "$name" >/dev/null 2>&1 || true + toxiproxy-cli delete "$name" >/dev/null 2>&1 || true } # Set a proxy to enabled or disabled toxiproxy_set_enabled() { local name=$1 enabled=$2 if [ "$enabled" = "true" ]; then - _toxiproxy_cli enable "$name" >/dev/null + toxiproxy-cli enable "$name" >/dev/null else - _toxiproxy_cli disable "$name" >/dev/null + toxiproxy-cli disable "$name" >/dev/null fi } # Add latency toxic (downstream) toxiproxy_add_latency() { local name=$1 latency=$2 jitter=${3:-0} - _toxiproxy_cli toxic add "$name" -t latency -a latency="$latency" -a jitter="$jitter" -d downstream >/dev/null + toxiproxy-cli toxic add "$name" -t latency -a latency="$latency" -a jitter="$jitter" -d >/dev/null } # Add timeout toxic (downstream) toxiproxy_add_timeout() { local name=$1 timeout_ms=$2 - _toxiproxy_cli toxic add "$name" -t timeout -a timeout="$timeout_ms" -d downstream >/dev/null + toxiproxy-cli toxic add "$name" -t timeout -a timeout="$timeout_ms" -d >/dev/null } From 6e987f89910a26701e025def3761131f0974b024 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 22 Oct 2025 18:33:23 +0800 Subject: [PATCH 16/27] Update CI to install toxiproxy for canister_extra tests. --- .github/workflows/e2e.yml | 26 ++++++++++++++++++++++++++ e2e/tests-dfx/canister_extra.bash | 2 -- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 609a9087c4..7bc0bdd435 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -172,6 +172,32 @@ jobs: echo "assets=$(pwd)/e2e/assets" >> "$GITHUB_ENV" echo "utils=$(pwd)/e2e/utils" >> "$GITHUB_ENV" export + - name: Install toxiproxy if needed + run: | + if [ "$E2E_TEST" = "tests-dfx/canister_extra.bash" ]; then + VERSION=v2.10.0 + OS=$(uname -s) + ARCH=$(uname -m) + + case "$OS" in + Linux) PLATFORM="linux" ;; + Darwin) PLATFORM="darwin" ;; + *) echo "Unsupported OS: $OS" >&2; exit 1 ;; + esac + + case "$ARCH" in + x86_64|amd64) ARCH_DL="amd64" ;; + arm64|aarch64) ARCH_DL="arm64" ;; + *) echo "Unsupported ARCH: $ARCH" >&2; exit 1 ;; + esac + + BASE_URL="https://github.com/Shopify/toxiproxy/releases/download/$VERSION" + curl -fsSL "$BASE_URL/toxiproxy-server-${PLATFORM}-${ARCH_DL}" -o toxiproxy-server + curl -fsSL "$BASE_URL/toxiproxy-cli-${PLATFORM}-${ARCH_DL}" -o toxiproxy-cli + chmod +x toxiproxy-server toxiproxy-cli + sudo mv toxiproxy-server /usr/local/bin/ + sudo mv toxiproxy-cli /usr/local/bin/ + fi - name: Download bats-support as a git submodule run: git submodule update --init --recursive - name: Cache mops files diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index 9f07b6d370..d51529be76 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -107,8 +107,6 @@ teardown() { } @test "canister snapshots download and upload via toxiproxy with high latency" { - skip "Need to update CI to install toxiproxy." - # Start the dfx server on a random port. dfx_port=$(get_ephemeral_port) dfx_start --host "127.0.0.1:$dfx_port" From d654ea0aa8f5143092af6b557315fa6828bea09e Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 22 Oct 2025 19:58:11 +0800 Subject: [PATCH 17/27] Added two e2e tests for canister snapshot. --- e2e/tests-dfx/canister_extra.bash | 122 ++++++++++++++++++++-- e2e/utils/toxiproxy.bash | 11 +- src/dfx/src/commands/canister/snapshot.rs | 4 + 3 files changed, 122 insertions(+), 15 deletions(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index d51529be76..dec4b01b1d 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -113,17 +113,18 @@ teardown() { # Start toxiproxy and create a proxy. toxiproxy_start - toxiproxy_create_proxy dfx_proxy "127.0.0.1:4843" "127.0.0.1:$dfx_port" + proxy_port=$(get_ephemeral_port) + toxiproxy_create_proxy dfx_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" install_asset counter - dfx deploy --no-wallet --network "http://127.0.0.1:4843" + dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" - assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:4843" + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" assert_contains '(1 : nat)' # Create a snapshot to download. - dfx canister stop hello_backend --network "http://127.0.0.1:4843" - assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:4843" + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:$proxy_port" assert_match 'Snapshot ID: ([0-9a-f]+)' snapshot=${BASH_REMATCH[1]} @@ -133,11 +134,116 @@ teardown() { # Download through the proxy with latency. OUTPUT_DIR="output" mkdir -p "$OUTPUT_DIR" - assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network http://127.0.0.1:4843 + assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" assert_contains "saved to '$OUTPUT_DIR'" - assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:4843" + # Start the canister again. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(2 : nat)' + + # Upload the snapshot to create a new snapshot. + assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" assert_match 'Snapshot ID: ([0-9a-f]+)' + snapshot_1=${BASH_REMATCH[1]} + + # Stop the canister and load the new snapshot. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot load hello_backend "$snapshot_1" --network "http://127.0.0.1:$proxy_port" + + # Start the canister again and verify the loaded snapshot. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' + + toxiproxy_stop || true +} + +@test "canister snapshots download and upload via toxiproxy with network drop" { + # Start the dfx server on a random port. + dfx_port=$(get_ephemeral_port) + dfx_start --host "127.0.0.1:$dfx_port" + + # Start toxiproxy and create a proxy. + toxiproxy_start + proxy_port=$(get_ephemeral_port) + toxiproxy_create_proxy dfx_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" + + install_asset counter + dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" + + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' + + # Create a snapshot to download. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:$proxy_port" + assert_match 'Snapshot ID: ([0-9a-f]+)' + snapshot=${BASH_REMATCH[1]} + + ( + # Toxiproxy doesn't support disabling the proxy with certain amount of data being transferred. + # So we roughly wait for 0.5 seconds to disable the proxy and fail the snapshot download. + sleep 0.5 + toxiproxy_toggle_proxy dfx_proxy || true + ) & + + # Download through the proxy. + OUTPUT_DIR="output" + mkdir -p "$OUTPUT_DIR" + assert_command_fail timeout 10s dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + + # For debugging. + echo "OUTPUT_DIR contents:" >&2 + find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 + + # Enable the proxy again. + toxiproxy_toggle_proxy dfx_proxy || true + + # Resume the download through the proxy. + assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" + assert_contains "saved to '$OUTPUT_DIR'" + + # For debugging. + echo "OUTPUT_DIR contents:" >&2 + find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 + + # Start the canister again. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(2 : nat)' + + ( + # Toxiproxy doesn't support disabling the proxy with certain amount of data being transferred. + # So we roughly wait for 0.5 seconds to disable the proxy and fail the snapshot upload. + sleep 0.5 + toxiproxy_toggle_proxy dfx_proxy || true + ) & + + # Upload the snapshot to create a new snapshot. + assert_command_fail timeout 10s dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + assert_match 'snapshot ([0-9a-f]+)' + snapshot_1=${BASH_REMATCH[1]} + + # For debugging. + echo "OUTPUT_DIR contents:" >&2 + find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 + + # Enable the proxy again. + toxiproxy_toggle_proxy dfx_proxy || true + + # Resume the upload through the proxy. + assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" -r "$snapshot_1" --network "http://127.0.0.1:$proxy_port" + assert_contains "$snapshot_1" + + # Stop the canister and load the new snapshot. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot load hello_backend "$snapshot_1" --network "http://127.0.0.1:$proxy_port" + + # Start the canister again and verify the loaded snapshot. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' toxiproxy_stop || true } @@ -153,4 +259,4 @@ teardown() { assert_command dfx canister call e2e_project_backend get_url '("www.githubstatus.com:443","https://www.githubstatus.com:443")' assert_contains "Git Operations" assert_contains "API Requests" -} \ No newline at end of file +} diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash index 2d00657b98..e2d8cfae9d 100644 --- a/e2e/utils/toxiproxy.bash +++ b/e2e/utils/toxiproxy.bash @@ -54,6 +54,7 @@ toxiproxy_create_proxy() { return 1 fi + toxiproxy-cli delete "$name" >/dev/null 2>&1 toxiproxy-cli create "$name" --listen "$listen" --upstream "$upstream" >/dev/null 2>&1 } @@ -64,13 +65,9 @@ toxiproxy_delete_proxy() { } # Set a proxy to enabled or disabled -toxiproxy_set_enabled() { - local name=$1 enabled=$2 - if [ "$enabled" = "true" ]; then - toxiproxy-cli enable "$name" >/dev/null - else - toxiproxy-cli disable "$name" >/dev/null - fi +toxiproxy_toggle_proxy() { + local name=$1 + toxiproxy-cli toggle "$name" >/dev/null 2>&1 } # Add latency toxic (downstream) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index e51968a3ca..547ce7f349 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -522,6 +522,10 @@ async fn upload( .into(); upload_progress.snapshot_id = snapshot_id.to_string(); upload_progress.metadata_uploaded = true; + save_json_file( + &dir.join(format!("{}.json", upload_progress.snapshot_id)), + &upload_progress, + )?; debug!( env.get_logger(), From aa367111233b2f28f67467a269f7092297696c62 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 22 Oct 2025 21:05:17 +0800 Subject: [PATCH 18/27] Use toxiproxy-cli directly for debugging... --- e2e/tests-dfx/canister_extra.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index dec4b01b1d..6127ac065a 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -114,7 +114,7 @@ teardown() { # Start toxiproxy and create a proxy. toxiproxy_start proxy_port=$(get_ephemeral_port) - toxiproxy_create_proxy dfx_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" + toxiproxy-cli create dfx_proxy --listen "127.0.0.1:$proxy_port" --upstream "127.0.0.1:$dfx_port" install_asset counter dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" From 9ac3ccbb3f12a2fc55235e65258ced647c727325 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Wed, 22 Oct 2025 22:08:21 +0800 Subject: [PATCH 19/27] Add more debugging info... --- e2e/tests-dfx/canister_extra.bash | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index 6127ac065a..7846e34c70 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -109,12 +109,14 @@ teardown() { @test "canister snapshots download and upload via toxiproxy with high latency" { # Start the dfx server on a random port. dfx_port=$(get_ephemeral_port) + echo "dfx_port: $dfx_port" dfx_start --host "127.0.0.1:$dfx_port" # Start toxiproxy and create a proxy. toxiproxy_start proxy_port=$(get_ephemeral_port) - toxiproxy-cli create dfx_proxy --listen "127.0.0.1:$proxy_port" --upstream "127.0.0.1:$dfx_port" + echo "proxy_port: $proxy_port" + toxiproxy-cli create --listen "127.0.0.1:$proxy_port" --upstream "127.0.0.1:$dfx_port" dfx_proxy install_asset counter dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" From 0865e85c4cafacef5bff47ea4a967a5555eb2587 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Thu, 23 Oct 2025 19:46:08 +0800 Subject: [PATCH 20/27] Make the snapshot tests be able to run in parallel. --- e2e/tests-dfx/canister_extra.bash | 22 +++++++++++----------- e2e/utils/toxiproxy.bash | 16 +++++----------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index 7846e34c70..20695f2bd0 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -6,10 +6,12 @@ load ../utils/toxiproxy setup() { standard_setup dfx_new hello + toxiproxy_start } teardown() { dfx_stop + toxiproxy_stop || true standard_teardown } @@ -113,10 +115,9 @@ teardown() { dfx_start --host "127.0.0.1:$dfx_port" # Start toxiproxy and create a proxy. - toxiproxy_start proxy_port=$(get_ephemeral_port) echo "proxy_port: $proxy_port" - toxiproxy-cli create --listen "127.0.0.1:$proxy_port" --upstream "127.0.0.1:$dfx_port" dfx_proxy + toxiproxy_create_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" proxy_high_latency install_asset counter dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" @@ -131,7 +132,7 @@ teardown() { snapshot=${BASH_REMATCH[1]} # Add latency to the proxy. - toxiproxy_add_latency dfx_proxy 1500 300 + toxiproxy_add_latency 1500 300 proxy_high_latency # Download through the proxy with latency. OUTPUT_DIR="output" @@ -158,7 +159,7 @@ teardown() { assert_command dfx canister call hello_backend read --network "http://127.0.0.1:$proxy_port" assert_contains '(1 : nat)' - toxiproxy_stop || true + toxiproxy_delete_proxy proxy_high_latency } @test "canister snapshots download and upload via toxiproxy with network drop" { @@ -167,9 +168,8 @@ teardown() { dfx_start --host "127.0.0.1:$dfx_port" # Start toxiproxy and create a proxy. - toxiproxy_start proxy_port=$(get_ephemeral_port) - toxiproxy_create_proxy dfx_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" + toxiproxy_create_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" proxy_network_drop install_asset counter dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" @@ -187,7 +187,7 @@ teardown() { # Toxiproxy doesn't support disabling the proxy with certain amount of data being transferred. # So we roughly wait for 0.5 seconds to disable the proxy and fail the snapshot download. sleep 0.5 - toxiproxy_toggle_proxy dfx_proxy || true + toxiproxy_toggle_proxy proxy_network_drop ) & # Download through the proxy. @@ -200,7 +200,7 @@ teardown() { find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 # Enable the proxy again. - toxiproxy_toggle_proxy dfx_proxy || true + toxiproxy_toggle_proxy proxy_network_drop # Resume the download through the proxy. assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" @@ -219,7 +219,7 @@ teardown() { # Toxiproxy doesn't support disabling the proxy with certain amount of data being transferred. # So we roughly wait for 0.5 seconds to disable the proxy and fail the snapshot upload. sleep 0.5 - toxiproxy_toggle_proxy dfx_proxy || true + toxiproxy_toggle_proxy proxy_network_drop ) & # Upload the snapshot to create a new snapshot. @@ -232,7 +232,7 @@ teardown() { find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 # Enable the proxy again. - toxiproxy_toggle_proxy dfx_proxy || true + toxiproxy_toggle_proxy proxy_network_drop # Resume the upload through the proxy. assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" -r "$snapshot_1" --network "http://127.0.0.1:$proxy_port" @@ -247,7 +247,7 @@ teardown() { assert_command dfx canister call hello_backend read --network "http://127.0.0.1:$proxy_port" assert_contains '(1 : nat)' - toxiproxy_stop || true + toxiproxy_delete_proxy proxy_network_drop } @test "can query a website" { diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash index e2d8cfae9d..8112e2ae02 100644 --- a/e2e/utils/toxiproxy.bash +++ b/e2e/utils/toxiproxy.bash @@ -46,7 +46,7 @@ toxiproxy_stop() { # Create or replace a proxy toxiproxy_create_proxy() { - local name=$1 listen=$2 upstream=$3 + local listen=$1 upstream=$2 name=$3 # Ensure toxiproxy-cli is available if ! command -v toxiproxy-cli >/dev/null 2>&1; then @@ -54,8 +54,8 @@ toxiproxy_create_proxy() { return 1 fi - toxiproxy-cli delete "$name" >/dev/null 2>&1 - toxiproxy-cli create "$name" --listen "$listen" --upstream "$upstream" >/dev/null 2>&1 + toxiproxy-cli delete "$name" >/dev/null 2>&1 || true + toxiproxy-cli create --listen "$listen" --upstream "$upstream" "$name" >/dev/null 2>&1 } # Delete a proxy @@ -72,12 +72,6 @@ toxiproxy_toggle_proxy() { # Add latency toxic (downstream) toxiproxy_add_latency() { - local name=$1 latency=$2 jitter=${3:-0} - toxiproxy-cli toxic add "$name" -t latency -a latency="$latency" -a jitter="$jitter" -d >/dev/null -} - -# Add timeout toxic (downstream) -toxiproxy_add_timeout() { - local name=$1 timeout_ms=$2 - toxiproxy-cli toxic add "$name" -t timeout -a timeout="$timeout_ms" -d >/dev/null + local latency=$1 jitter=$2 name=$3 + toxiproxy-cli toxic add -t latency -a latency="$latency" -a jitter="$jitter" -d "$name" >/dev/null } From 2667fd710d263863d3ed2395937e6ad7a4a21830 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Thu, 23 Oct 2025 21:25:24 +0800 Subject: [PATCH 21/27] Add canister_extra as serial... --- scripts/workflows/e2e-matrix.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/workflows/e2e-matrix.py b/scripts/workflows/e2e-matrix.py index 9954db2d43..111eb93db7 100755 --- a/scripts/workflows/e2e-matrix.py +++ b/scripts/workflows/e2e-matrix.py @@ -7,7 +7,7 @@ SELECTED_TESTS = ["dfx/bitcoin", "dfx/canister_http_adapter", "dfx/start"] # Run these tests in serial -SERIAL_TESTS = ["dfx/start", "dfx/bitcoin", "dfx/cycles-ledger", "dfx/ledger", "dfx/serial_misc"] +SERIAL_TESTS = ["dfx/start", "dfx/bitcoin", "dfx/cycles-ledger", "dfx/ledger", "dfx/serial_misc", "dfx/canister_extra"] def test_scripts(prefix): all_files = os.listdir(f"e2e/tests-{prefix}") From 35b06c6e513162700c37e5664876b9f00bd942b1 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Fri, 24 Oct 2025 11:00:26 +0800 Subject: [PATCH 22/27] Moved toxiproxy installation into provision script. --- .github/workflows/e2e.yml | 26 -------------------------- scripts/workflows/provision-linux.sh | 16 ++++++++++++++++ 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 7bc0bdd435..609a9087c4 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -172,32 +172,6 @@ jobs: echo "assets=$(pwd)/e2e/assets" >> "$GITHUB_ENV" echo "utils=$(pwd)/e2e/utils" >> "$GITHUB_ENV" export - - name: Install toxiproxy if needed - run: | - if [ "$E2E_TEST" = "tests-dfx/canister_extra.bash" ]; then - VERSION=v2.10.0 - OS=$(uname -s) - ARCH=$(uname -m) - - case "$OS" in - Linux) PLATFORM="linux" ;; - Darwin) PLATFORM="darwin" ;; - *) echo "Unsupported OS: $OS" >&2; exit 1 ;; - esac - - case "$ARCH" in - x86_64|amd64) ARCH_DL="amd64" ;; - arm64|aarch64) ARCH_DL="arm64" ;; - *) echo "Unsupported ARCH: $ARCH" >&2; exit 1 ;; - esac - - BASE_URL="https://github.com/Shopify/toxiproxy/releases/download/$VERSION" - curl -fsSL "$BASE_URL/toxiproxy-server-${PLATFORM}-${ARCH_DL}" -o toxiproxy-server - curl -fsSL "$BASE_URL/toxiproxy-cli-${PLATFORM}-${ARCH_DL}" -o toxiproxy-cli - chmod +x toxiproxy-server toxiproxy-cli - sudo mv toxiproxy-server /usr/local/bin/ - sudo mv toxiproxy-cli /usr/local/bin/ - fi - name: Download bats-support as a git submodule run: git submodule update --init --recursive - name: Cache mops files diff --git a/scripts/workflows/provision-linux.sh b/scripts/workflows/provision-linux.sh index 286c27d720..5751946b33 100755 --- a/scripts/workflows/provision-linux.sh +++ b/scripts/workflows/provision-linux.sh @@ -54,6 +54,22 @@ fi if [ "$E2E_TEST" = "tests-dfx/info.bash" ]; then sudo apt-get install --yes libarchive-zip-perl fi +if [ "$E2E_TEST" = "tests-dfx/canister_extra.bash" ]; then + VERSION=v2.10.0 + ARCH=$(uname -m) + case "$ARCH" in + x86_64|amd64) ARCH_DL="amd64" ;; + arm64|aarch64) ARCH_DL="arm64" ;; + *) echo "Unsupported ARCH: $ARCH" >&2; exit 1 ;; + esac + + BASE_URL="https://github.com/Shopify/toxiproxy/releases/download/$VERSION" + curl -fsSL "$BASE_URL/toxiproxy-server-linux-${ARCH_DL}" -o toxiproxy-server + curl -fsSL "$BASE_URL/toxiproxy-cli-linux-${ARCH_DL}" -o toxiproxy-cli + chmod +x toxiproxy-server toxiproxy-cli + sudo mv toxiproxy-server /usr/local/bin/ + sudo mv toxiproxy-cli /usr/local/bin/ +fi # Set environment variables. echo "$HOME/bin" >> "$GITHUB_PATH" From e5ae4eed6434876e6a948519858858acf3b2fc1b Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Fri, 24 Oct 2025 12:30:42 +0800 Subject: [PATCH 23/27] Make the canister snapshot network drop more robust... --- e2e/tests-dfx/canister_extra.bash | 34 +++++++++++------------ src/dfx/src/commands/canister/snapshot.rs | 26 ++++++----------- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index 20695f2bd0..07d6995c7c 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -111,12 +111,10 @@ teardown() { @test "canister snapshots download and upload via toxiproxy with high latency" { # Start the dfx server on a random port. dfx_port=$(get_ephemeral_port) - echo "dfx_port: $dfx_port" dfx_start --host "127.0.0.1:$dfx_port" # Start toxiproxy and create a proxy. proxy_port=$(get_ephemeral_port) - echo "proxy_port: $proxy_port" toxiproxy_create_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" proxy_high_latency install_asset counter @@ -190,15 +188,11 @@ teardown() { toxiproxy_toggle_proxy proxy_network_drop ) & - # Download through the proxy. + # Download through the proxy should fail. OUTPUT_DIR="output" mkdir -p "$OUTPUT_DIR" assert_command_fail timeout 10s dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" - # For debugging. - echo "OUTPUT_DIR contents:" >&2 - find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 - # Enable the proxy again. toxiproxy_toggle_proxy proxy_network_drop @@ -206,10 +200,6 @@ teardown() { assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" assert_contains "saved to '$OUTPUT_DIR'" - # For debugging. - echo "OUTPUT_DIR contents:" >&2 - find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 - # Start the canister again. dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" @@ -222,14 +212,22 @@ teardown() { toxiproxy_toggle_proxy proxy_network_drop ) & - # Upload the snapshot to create a new snapshot. + # Upload the snapshot should fail. assert_command_fail timeout 10s dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" - assert_match 'snapshot ([0-9a-f]+)' - snapshot_1=${BASH_REMATCH[1]} - - # For debugging. - echo "OUTPUT_DIR contents:" >&2 - find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 + + # Loop to find a .json filename in OUTPUT_DIR that matches ^[0-9a-f]+\.json$. + snapshot_1="" + while IFS= read -r json_file; do + [ -z "$json_file" ] && continue + if [[ "$json_file" =~ ^[0-9a-f]+\.json$ ]]; then + snapshot_1="${json_file%.json}" + break + fi + done < <(find "$OUTPUT_DIR" -maxdepth 1 -type f -name '*.json' -printf '%f\n') + if [ -z "$snapshot_1" ]; then + echo "No matching .json filename ([0-9a-f]+.json) found in $OUTPUT_DIR" >&2 + false + fi # Enable the proxy again. toxiproxy_toggle_proxy proxy_network_drop diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 547ce7f349..63b6e5ec21 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -539,7 +539,7 @@ async fn upload( let retry_policy = ExponentialBackoff::default(); // Upload Wasm module. - write_upload_progress_file_on_error( + write_upload_progress_file( upload_data( env, &canister, @@ -558,7 +558,7 @@ async fn upload( )?; // Upload Wasm memory. - write_upload_progress_file_on_error( + write_upload_progress_file( upload_data( env, &canister, @@ -578,7 +578,7 @@ async fn upload( // Upload stable memory. if metadata.stable_memory_size > 0 { - write_upload_progress_file_on_error( + write_upload_progress_file( upload_data( env, &canister, @@ -616,7 +616,7 @@ async fn upload( format!("Failed to read Wasm chunk from '{}'", chunk_file.display()) })?; - write_upload_progress_file_on_error( + write_upload_progress_file( retry(retry_policy.clone(), || async { let data_args = UploadCanisterSnapshotDataArgs { canister_id, @@ -660,26 +660,18 @@ async fn upload( "Uploaded a snapshot of canister {canister}. Snapshot ID: {}", snapshot_id ); - // Save the upload progress. - save_json_file( - &dir.join(format!("{}.json", upload_progress.snapshot_id)), - &upload_progress, - )?; - Ok(()) } -fn write_upload_progress_file_on_error( +fn write_upload_progress_file( result: DfxResult, upload_progress: &SnapshotUploadProgress, dir: &Path, ) -> DfxResult { - if result.is_err() { - save_json_file( - &dir.join(format!("{}.json", upload_progress.snapshot_id)), - &upload_progress, - )?; - } + save_json_file( + &dir.join(format!("{}.json", upload_progress.snapshot_id)), + &upload_progress, + )?; result } From fcbff22ca942c341ed66354929e61f3f0b3ca879 Mon Sep 17 00:00:00 2001 From: Vincent Zhang Date: Sun, 26 Oct 2025 19:50:09 +0800 Subject: [PATCH 24/27] Use limit_data instead. --- e2e/tests-dfx/canister_extra.bash | 38 ++++++++++++++----------------- e2e/utils/toxiproxy.bash | 12 ++++++++++ 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index 07d6995c7c..f5214ccdb6 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -181,20 +181,20 @@ teardown() { assert_match 'Snapshot ID: ([0-9a-f]+)' snapshot=${BASH_REMATCH[1]} - ( - # Toxiproxy doesn't support disabling the proxy with certain amount of data being transferred. - # So we roughly wait for 0.5 seconds to disable the proxy and fail the snapshot download. - sleep 0.5 - toxiproxy_toggle_proxy proxy_network_drop - ) & - - # Download through the proxy should fail. + # Add a 1MB limit_data toxic to force the snapshot download to fail. + toxiproxy_add_limit_data limit_download 1000000 proxy_network_drop + + # Download the snapshot should fail. OUTPUT_DIR="output" mkdir -p "$OUTPUT_DIR" - assert_command_fail timeout 10s dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + assert_command_fail timeout -s9 10s dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + + # For debugging. + echo "OUTPUT_DIR contents:" >&2 + find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 - # Enable the proxy again. - toxiproxy_toggle_proxy proxy_network_drop + # Remove the toxic. + toxiproxy_remove_toxic limit_download proxy_network_drop # Resume the download through the proxy. assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" @@ -205,17 +205,13 @@ teardown() { assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" assert_contains '(2 : nat)' - ( - # Toxiproxy doesn't support disabling the proxy with certain amount of data being transferred. - # So we roughly wait for 0.5 seconds to disable the proxy and fail the snapshot upload. - sleep 0.5 - toxiproxy_toggle_proxy proxy_network_drop - ) & + # Add a 1MB limit_data toxic to force the snapshot upload to fail. + toxiproxy_add_limit_data limit_upload 1000000 proxy_network_drop # Upload the snapshot should fail. - assert_command_fail timeout 10s dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + assert_command_fail timeout -s9 10s dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" - # Loop to find a .json filename in OUTPUT_DIR that matches ^[0-9a-f]+\.json$. + # Loop to get the snapshot id. snapshot_1="" while IFS= read -r json_file; do [ -z "$json_file" ] && continue @@ -229,8 +225,8 @@ teardown() { false fi - # Enable the proxy again. - toxiproxy_toggle_proxy proxy_network_drop + # Remove the toxic. + toxiproxy_remove_toxic limit_upload proxy_network_drop # Resume the upload through the proxy. assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" -r "$snapshot_1" --network "http://127.0.0.1:$proxy_port" diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash index 8112e2ae02..ab557ae071 100644 --- a/e2e/utils/toxiproxy.bash +++ b/e2e/utils/toxiproxy.bash @@ -75,3 +75,15 @@ toxiproxy_add_latency() { local latency=$1 jitter=$2 name=$3 toxiproxy-cli toxic add -t latency -a latency="$latency" -a jitter="$jitter" -d "$name" >/dev/null } + +# Add limit_data toxic (downstream) +toxiproxy_add_limit_data() { + local toxic_name=$1 bytes=$2 proxy_name=$3 + toxiproxy-cli toxic add -n "$toxic_name" -t limit_data -a bytes="$bytes" -d "$proxy_name" >/dev/null +} + +# Remove a toxic +toxiproxy_remove_toxic() { + local toxic_name=$1 proxy_name=$2 + toxiproxy-cli toxic remove -n "$toxic_name" -d "$proxy_name" >/dev/null 2>&1 || true +} \ No newline at end of file From 0df8dac000cba8c2cefb76a4f59150b6dcd1ae87 Mon Sep 17 00:00:00 2001 From: Adam Spofford Date: Tue, 28 Oct 2025 12:59:12 -0400 Subject: [PATCH 25/27] fix hanging test (and mac `find -printf`) --- e2e/tests-dfx/canister_extra.bash | 12 ++++++------ e2e/utils/toxiproxy.bash | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index f5214ccdb6..e51df2d2b8 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -159,7 +159,7 @@ teardown() { toxiproxy_delete_proxy proxy_high_latency } - +# bats test_tags=bats:focus @test "canister snapshots download and upload via toxiproxy with network drop" { # Start the dfx server on a random port. dfx_port=$(get_ephemeral_port) @@ -191,13 +191,13 @@ teardown() { # For debugging. echo "OUTPUT_DIR contents:" >&2 - find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -printf '%f\t%s\n' >&2 + find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -exec du -h {} \+ >&2 # Remove the toxic. toxiproxy_remove_toxic limit_download proxy_network_drop # Resume the download through the proxy. - assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" + assert_command dfx -v canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" assert_contains "saved to '$OUTPUT_DIR'" # Start the canister again. @@ -206,11 +206,11 @@ teardown() { assert_contains '(2 : nat)' # Add a 1MB limit_data toxic to force the snapshot upload to fail. - toxiproxy_add_limit_data limit_upload 1000000 proxy_network_drop + toxiproxy_add_limit_data limit_upload 1000000 proxy_network_drop -u # Upload the snapshot should fail. assert_command_fail timeout -s9 10s dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" - + # Loop to get the snapshot id. snapshot_1="" while IFS= read -r json_file; do @@ -219,7 +219,7 @@ teardown() { snapshot_1="${json_file%.json}" break fi - done < <(find "$OUTPUT_DIR" -maxdepth 1 -type f -name '*.json' -printf '%f\n') + done < <(find "$OUTPUT_DIR" -maxdepth 1 -type f -name '*.json' -exec basename {} \;) if [ -z "$snapshot_1" ]; then echo "No matching .json filename ([0-9a-f]+.json) found in $OUTPUT_DIR" >&2 false diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash index ab557ae071..e9f1150df4 100644 --- a/e2e/utils/toxiproxy.bash +++ b/e2e/utils/toxiproxy.bash @@ -22,7 +22,7 @@ toxiproxy_start() { return 1 fi - toxiproxy-server -host "$TOXIPROXY_HOST" -port "$TOXIPROXY_PORT" >/dev/null 2>&1 & + toxiproxy-server -host "$TOXIPROXY_HOST" -port "$TOXIPROXY_PORT" >/dev/null 2>&1 3>&- & export E2E_TOXIPROXY_PID=$! for _ in $(seq 1 50); do @@ -78,12 +78,12 @@ toxiproxy_add_latency() { # Add limit_data toxic (downstream) toxiproxy_add_limit_data() { - local toxic_name=$1 bytes=$2 proxy_name=$3 - toxiproxy-cli toxic add -n "$toxic_name" -t limit_data -a bytes="$bytes" -d "$proxy_name" >/dev/null + local toxic_name=$1 bytes=$2 proxy_name=$3 direction=${4:-"-d"} + toxiproxy-cli toxic add -n "$toxic_name" -t limit_data -a bytes="$bytes" ${direction:+"$direction"} "$proxy_name" >/dev/null } # Remove a toxic toxiproxy_remove_toxic() { local toxic_name=$1 proxy_name=$2 - toxiproxy-cli toxic remove -n "$toxic_name" -d "$proxy_name" >/dev/null 2>&1 || true -} \ No newline at end of file + toxiproxy-cli toxic remove -n "$toxic_name" "$proxy_name" >/dev/null 2>&1 || true +} From d155edfdfa38064b14da87ed4df759afbc9cec8e Mon Sep 17 00:00:00 2001 From: Adam Spofford Date: Tue, 28 Oct 2025 12:59:32 -0400 Subject: [PATCH 26/27] remove focus tag --- e2e/tests-dfx/canister_extra.bash | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index e51df2d2b8..0d596d4d4b 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -159,7 +159,7 @@ teardown() { toxiproxy_delete_proxy proxy_high_latency } -# bats test_tags=bats:focus + @test "canister snapshots download and upload via toxiproxy with network drop" { # Start the dfx server on a random port. dfx_port=$(get_ephemeral_port) From dcf1ce32811caf6db98325e04868cd0538c6264a Mon Sep 17 00:00:00 2001 From: Adam Spofford Date: Tue, 28 Oct 2025 13:40:15 -0400 Subject: [PATCH 27/27] Add mention of `--resume` to error message --- src/dfx/src/commands/canister/snapshot.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 63b6e5ec21..05fb75cc4b 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -680,7 +680,10 @@ fn check_dir(dir: &PathBuf) -> DfxResult { let mut entries = std::fs::read_dir(dir) .with_context(|| format!("Failed to read directory '{}'", dir.display()))?; if entries.next().is_some() { - bail!("Directory '{}' is not empty", dir.display()); + bail!( + "Directory '{}' is not empty (to resume an aborted download, use `--resume`)", + dir.display() + ); } // Check if the directory is writable.