diff --git a/CHANGELOG.md b/CHANGELOG.md index 6976aac526..fe3657e6bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ # UNRELEASED +### feat: improved the canister snapshot download/upload feature + +Improved the canister snapshot download/upload feature by +- adding progress bars to snapshot download/upload +- streaming snapshot download/upload directly to/from disk. +- supporting download/upload with resuming. +- supporting download/upload with concurrency, default to 3 tasks in parallel. + # 0.30.0 ### feat: `dfx start --system-canisters` for bootstrapping system canisters diff --git a/docs/cli-reference/dfx-canister.mdx b/docs/cli-reference/dfx-canister.mdx index 8597305d53..2033b71f1d 100644 --- a/docs/cli-reference/dfx-canister.mdx +++ b/docs/cli-reference/dfx-canister.mdx @@ -1028,11 +1028,13 @@ dfx canister snapshot download --dir You can use the following arguments with the `dfx canister snapshot download` command. -| Argument | Description | -|---------------|----------------------------------------------------------------------------| -| `` | The canister to download the snapshot from. | -| `` | The ID of the snapshot to download. | -| --dir `` | The directory to download the snapshot to. It should be created and empty. | +| Argument | Description | +|-------------------------------|--------------------------------------------------------------------------------------------| +| `` | The canister to download the snapshot from. | +| `` | The ID of the snapshot to download. | +| --dir `` | The directory to download the snapshot to. It should be created and empty if not resuming. | +| --resume | Whether to resume the download if the previous snapshot download failed. | +| --concurrency `` | The number of concurrent downloads to perform [default: 3]. | ### Examples @@ -1056,11 +1058,13 @@ dfx canister snapshot upload --dir You can use the following arguments with the `dfx canister snapshot upload` command. -| Argument | Description | -|-----------------------|--------------------------------------------------------------------------------| -| `` | The canister to upload the snapshot to. | -| --dir `` | The directory to upload the snapshot from. | -| --replace `` | If a snapshot ID is specified, the snapshot identified by this ID will be deleted and a snapshot with a new ID will be returned. | +| Argument | Description | +|-------------------------------|--------------------------------------------------------------------------------| +| `` | The canister to upload the snapshot to. | +| --dir `` | The directory to upload the snapshot from. | +| --replace `` | If a snapshot ID is specified, the snapshot identified by this ID will be deleted and a snapshot with a new ID will be returned. | +| --resume `` | The snapshot ID to resume uploading to. | +| --concurrency `` | The number of concurrent uploads to perform [default: 3]. | ### Examples diff --git a/e2e/tests-dfx/canister_extra.bash b/e2e/tests-dfx/canister_extra.bash index a1cd13e29d..0d596d4d4b 100755 --- a/e2e/tests-dfx/canister_extra.bash +++ b/e2e/tests-dfx/canister_extra.bash @@ -1,14 +1,17 @@ #!/usr/bin/env bats load ../utils/_ +load ../utils/toxiproxy setup() { standard_setup dfx_new hello + toxiproxy_start } teardown() { dfx_stop + toxiproxy_stop || true standard_teardown } @@ -105,6 +108,142 @@ teardown() { assert_contains '(1 : nat)' } +@test "canister snapshots download and upload via toxiproxy with high latency" { + # Start the dfx server on a random port. + dfx_port=$(get_ephemeral_port) + dfx_start --host "127.0.0.1:$dfx_port" + + # Start toxiproxy and create a proxy. + proxy_port=$(get_ephemeral_port) + toxiproxy_create_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" proxy_high_latency + + install_asset counter + dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" + + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' + + # Create a snapshot to download. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:$proxy_port" + assert_match 'Snapshot ID: ([0-9a-f]+)' + snapshot=${BASH_REMATCH[1]} + + # Add latency to the proxy. + toxiproxy_add_latency 1500 300 proxy_high_latency + + # Download through the proxy with latency. + OUTPUT_DIR="output" + mkdir -p "$OUTPUT_DIR" + assert_command dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + assert_contains "saved to '$OUTPUT_DIR'" + + # Start the canister again. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(2 : nat)' + + # Upload the snapshot to create a new snapshot. + assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + assert_match 'Snapshot ID: ([0-9a-f]+)' + snapshot_1=${BASH_REMATCH[1]} + + # Stop the canister and load the new snapshot. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot load hello_backend "$snapshot_1" --network "http://127.0.0.1:$proxy_port" + + # Start the canister again and verify the loaded snapshot. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' + + toxiproxy_delete_proxy proxy_high_latency +} + +@test "canister snapshots download and upload via toxiproxy with network drop" { + # Start the dfx server on a random port. + dfx_port=$(get_ephemeral_port) + dfx_start --host "127.0.0.1:$dfx_port" + + # Start toxiproxy and create a proxy. + proxy_port=$(get_ephemeral_port) + toxiproxy_create_proxy "127.0.0.1:$proxy_port" "127.0.0.1:$dfx_port" proxy_network_drop + + install_asset counter + dfx deploy --no-wallet --network "http://127.0.0.1:$proxy_port" + + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' + + # Create a snapshot to download. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot create hello_backend --network "http://127.0.0.1:$proxy_port" + assert_match 'Snapshot ID: ([0-9a-f]+)' + snapshot=${BASH_REMATCH[1]} + + # Add a 1MB limit_data toxic to force the snapshot download to fail. + toxiproxy_add_limit_data limit_download 1000000 proxy_network_drop + + # Download the snapshot should fail. + OUTPUT_DIR="output" + mkdir -p "$OUTPUT_DIR" + assert_command_fail timeout -s9 10s dfx canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + + # For debugging. + echo "OUTPUT_DIR contents:" >&2 + find "$OUTPUT_DIR" -maxdepth 1 -mindepth 1 -type f -exec du -h {} \+ >&2 + + # Remove the toxic. + toxiproxy_remove_toxic limit_download proxy_network_drop + + # Resume the download through the proxy. + assert_command dfx -v canister snapshot download hello_backend "$snapshot" --dir "$OUTPUT_DIR" -r --network "http://127.0.0.1:$proxy_port" + assert_contains "saved to '$OUTPUT_DIR'" + + # Start the canister again. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend inc_read --network "http://127.0.0.1:$proxy_port" + assert_contains '(2 : nat)' + + # Add a 1MB limit_data toxic to force the snapshot upload to fail. + toxiproxy_add_limit_data limit_upload 1000000 proxy_network_drop -u + + # Upload the snapshot should fail. + assert_command_fail timeout -s9 10s dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" --network "http://127.0.0.1:$proxy_port" + + # Loop to get the snapshot id. + snapshot_1="" + while IFS= read -r json_file; do + [ -z "$json_file" ] && continue + if [[ "$json_file" =~ ^[0-9a-f]+\.json$ ]]; then + snapshot_1="${json_file%.json}" + break + fi + done < <(find "$OUTPUT_DIR" -maxdepth 1 -type f -name '*.json' -exec basename {} \;) + if [ -z "$snapshot_1" ]; then + echo "No matching .json filename ([0-9a-f]+.json) found in $OUTPUT_DIR" >&2 + false + fi + + # Remove the toxic. + toxiproxy_remove_toxic limit_upload proxy_network_drop + + # Resume the upload through the proxy. + assert_command dfx canister snapshot upload hello_backend --dir "$OUTPUT_DIR" -r "$snapshot_1" --network "http://127.0.0.1:$proxy_port" + assert_contains "$snapshot_1" + + # Stop the canister and load the new snapshot. + dfx canister stop hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister snapshot load hello_backend "$snapshot_1" --network "http://127.0.0.1:$proxy_port" + + # Start the canister again and verify the loaded snapshot. + dfx canister start hello_backend --network "http://127.0.0.1:$proxy_port" + assert_command dfx canister call hello_backend read --network "http://127.0.0.1:$proxy_port" + assert_contains '(1 : nat)' + + toxiproxy_delete_proxy proxy_network_drop +} + @test "can query a website" { dfx_start @@ -116,4 +255,4 @@ teardown() { assert_command dfx canister call e2e_project_backend get_url '("www.githubstatus.com:443","https://www.githubstatus.com:443")' assert_contains "Git Operations" assert_contains "API Requests" -} \ No newline at end of file +} diff --git a/e2e/utils/toxiproxy.bash b/e2e/utils/toxiproxy.bash new file mode 100644 index 0000000000..e9f1150df4 --- /dev/null +++ b/e2e/utils/toxiproxy.bash @@ -0,0 +1,89 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Helpers for toxiproxy to use toxiproxy-server and toxiproxy-cli + +: "${TOXIPROXY_HOST:=127.0.0.1}" +: "${TOXIPROXY_PORT:=8474}" + +# Check if toxiproxy server is running +toxiproxy_is_running() { + curl --silent --fail "http://${TOXIPROXY_HOST}:${TOXIPROXY_PORT}/version" >/dev/null 2>&1 +} + +# Start toxiproxy server +toxiproxy_start() { + if toxiproxy_is_running; then + return 0 + fi + + if ! command -v toxiproxy-server >/dev/null 2>&1; then + echo "toxiproxy-server not found in PATH" >&2 + return 1 + fi + + toxiproxy-server -host "$TOXIPROXY_HOST" -port "$TOXIPROXY_PORT" >/dev/null 2>&1 3>&- & + export E2E_TOXIPROXY_PID=$! + + for _ in $(seq 1 50); do + if toxiproxy_is_running; then + return 0 + fi + sleep 0.1 + done + + echo "Toxiproxy server did not become available on ${TOXIPROXY_HOST}:${TOXIPROXY_PORT}" >&2 + return 1 +} + +# Stop toxiproxy server +toxiproxy_stop() { + if [ -n "${E2E_TOXIPROXY_PID:-}" ]; then + kill "$E2E_TOXIPROXY_PID" >/dev/null 2>&1 || true + unset E2E_TOXIPROXY_PID + fi +} + +# Create or replace a proxy +toxiproxy_create_proxy() { + local listen=$1 upstream=$2 name=$3 + + # Ensure toxiproxy-cli is available + if ! command -v toxiproxy-cli >/dev/null 2>&1; then + echo "toxiproxy-cli not found in PATH" >&2 + return 1 + fi + + toxiproxy-cli delete "$name" >/dev/null 2>&1 || true + toxiproxy-cli create --listen "$listen" --upstream "$upstream" "$name" >/dev/null 2>&1 +} + +# Delete a proxy +toxiproxy_delete_proxy() { + local name=$1 + toxiproxy-cli delete "$name" >/dev/null 2>&1 || true +} + +# Set a proxy to enabled or disabled +toxiproxy_toggle_proxy() { + local name=$1 + toxiproxy-cli toggle "$name" >/dev/null 2>&1 +} + +# Add latency toxic (downstream) +toxiproxy_add_latency() { + local latency=$1 jitter=$2 name=$3 + toxiproxy-cli toxic add -t latency -a latency="$latency" -a jitter="$jitter" -d "$name" >/dev/null +} + +# Add limit_data toxic (downstream) +toxiproxy_add_limit_data() { + local toxic_name=$1 bytes=$2 proxy_name=$3 direction=${4:-"-d"} + toxiproxy-cli toxic add -n "$toxic_name" -t limit_data -a bytes="$bytes" ${direction:+"$direction"} "$proxy_name" >/dev/null +} + +# Remove a toxic +toxiproxy_remove_toxic() { + local toxic_name=$1 proxy_name=$2 + toxiproxy-cli toxic remove -n "$toxic_name" "$proxy_name" >/dev/null 2>&1 || true +} diff --git a/scripts/workflows/e2e-matrix.py b/scripts/workflows/e2e-matrix.py index 9954db2d43..111eb93db7 100755 --- a/scripts/workflows/e2e-matrix.py +++ b/scripts/workflows/e2e-matrix.py @@ -7,7 +7,7 @@ SELECTED_TESTS = ["dfx/bitcoin", "dfx/canister_http_adapter", "dfx/start"] # Run these tests in serial -SERIAL_TESTS = ["dfx/start", "dfx/bitcoin", "dfx/cycles-ledger", "dfx/ledger", "dfx/serial_misc"] +SERIAL_TESTS = ["dfx/start", "dfx/bitcoin", "dfx/cycles-ledger", "dfx/ledger", "dfx/serial_misc", "dfx/canister_extra"] def test_scripts(prefix): all_files = os.listdir(f"e2e/tests-{prefix}") diff --git a/scripts/workflows/provision-linux.sh b/scripts/workflows/provision-linux.sh index 286c27d720..5751946b33 100755 --- a/scripts/workflows/provision-linux.sh +++ b/scripts/workflows/provision-linux.sh @@ -54,6 +54,22 @@ fi if [ "$E2E_TEST" = "tests-dfx/info.bash" ]; then sudo apt-get install --yes libarchive-zip-perl fi +if [ "$E2E_TEST" = "tests-dfx/canister_extra.bash" ]; then + VERSION=v2.10.0 + ARCH=$(uname -m) + case "$ARCH" in + x86_64|amd64) ARCH_DL="amd64" ;; + arm64|aarch64) ARCH_DL="arm64" ;; + *) echo "Unsupported ARCH: $ARCH" >&2; exit 1 ;; + esac + + BASE_URL="https://github.com/Shopify/toxiproxy/releases/download/$VERSION" + curl -fsSL "$BASE_URL/toxiproxy-server-linux-${ARCH_DL}" -o toxiproxy-server + curl -fsSL "$BASE_URL/toxiproxy-cli-linux-${ARCH_DL}" -o toxiproxy-cli + chmod +x toxiproxy-server toxiproxy-cli + sudo mv toxiproxy-server /usr/local/bin/ + sudo mv toxiproxy-cli /usr/local/bin/ +fi # Set environment variables. echo "$HOME/bin" >> "$GITHUB_PATH" diff --git a/src/dfx/src/commands/canister/snapshot.rs b/src/dfx/src/commands/canister/snapshot.rs index 107bf9c405..05fb75cc4b 100644 --- a/src/dfx/src/commands/canister/snapshot.rs +++ b/src/dfx/src/commands/canister/snapshot.rs @@ -1,6 +1,8 @@ use std::{ + collections::{BTreeMap, HashMap}, fmt::{self, Display, Formatter}, - path::PathBuf, + io::SeekFrom, + path::{Path, PathBuf}, str::FromStr, }; @@ -13,16 +15,19 @@ use dfx_core::{ identity::CallSender, json::{load_json_file, save_json_file}, }; +use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use ic_management_canister_types::{ CanisterStatusType, LoadCanisterSnapshotArgs, ReadCanisterSnapshotDataArgs, ReadCanisterSnapshotMetadataArgs, ReadCanisterSnapshotMetadataResult, SnapshotDataKind, SnapshotDataOffset, UploadCanisterSnapshotDataArgs, UploadCanisterSnapshotMetadataArgs, UploadCanisterSnapshotMetadataResult, }; -use indicatif::HumanBytes; +use indicatif::{HumanBytes, ProgressStyle}; use itertools::Itertools; +use serde::{Deserialize, Serialize}; use slog::{debug, error, info}; use time::{OffsetDateTime, macros::format_description}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use crate::lib::{ environment::Environment, @@ -32,6 +37,7 @@ use crate::lib::{ load_canister_snapshot, read_canister_snapshot_data, read_canister_snapshot_metadata, take_canister_snapshot, upload_canister_snapshot_data, upload_canister_snapshot_metadata, }, + progress_bar::ProgressBar, retryable::retryable, root_key::fetch_root_key_if_needed, }; @@ -82,6 +88,12 @@ enum SnapshotSubcommand { /// The directory to download the snapshot to. #[arg(long, value_parser = directory_parser)] dir: PathBuf, + /// Whether to resume the download if the previous snapshot download failed. + #[arg(short, long, default_value = "false")] + resume: bool, + /// The number of concurrent downloads to perform. + #[arg(long, default_value = "3")] + concurrency: usize, }, /// Uploads a downloaded snapshot from a given directory to a canister. Upload { @@ -93,10 +105,16 @@ enum SnapshotSubcommand { /// The directory to upload the snapshot from. #[arg(long, value_parser = directory_parser)] dir: PathBuf, + /// The snapshot ID to resume uploading to. + #[arg(short, long)] + resume: Option, + /// The number of concurrent uploads to perform. + #[arg(long, default_value = "3")] + concurrency: usize, }, } -#[derive(Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct SnapshotId(Vec); impl Display for SnapshotId { @@ -139,12 +157,38 @@ pub async fn exec( canister, snapshot, dir, - } => download(env, canister, snapshot, dir, call_sender).await?, + resume, + concurrency, + } => { + download( + env, + canister, + snapshot, + dir, + resume, + concurrency, + call_sender, + ) + .await? + } SnapshotSubcommand::Upload { canister, replace, dir, - } => upload(env, canister, replace, dir, call_sender).await?, + resume, + concurrency, + } => { + upload( + env, + canister, + replace, + dir, + resume, + concurrency, + call_sender, + ) + .await? + } } Ok(()) } @@ -273,31 +317,43 @@ async fn download( canister: String, snapshot: SnapshotId, dir: PathBuf, + resume: bool, + concurrency: usize, call_sender: &CallSender, ) -> DfxResult { - check_dir(&dir)?; + if !resume { + check_dir(&dir)?; + } let canister_id = canister .parse() .or_else(|_| env.get_canister_id_store()?.get(&canister))?; // Store metadata. - let metadata_args = ReadCanisterSnapshotMetadataArgs { - canister_id, - snapshot_id: snapshot.0.clone(), - }; - let metadata = read_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) - .await - .with_context(|| { - format!("Failed to read metadata from snapshot {snapshot} in canister {canister}") - })?; let metadata_file = dir.join("metadata.json"); - save_json_file(&metadata_file, &metadata)?; - debug!( - env.get_logger(), - "Snapshot metadata saved to '{}'", - metadata_file.display() - ); + let metadata = if !metadata_file.exists() { + let metadata_args = ReadCanisterSnapshotMetadataArgs { + canister_id, + snapshot_id: snapshot.0.clone(), + }; + let metadata = + read_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) + .await + .with_context(|| { + format!( + "Failed to read metadata from snapshot {snapshot} in canister {canister}" + ) + })?; + save_json_file(&metadata_file, &metadata)?; + debug!( + env.get_logger(), + "Snapshot metadata saved to '{}'", + metadata_file.display() + ); + metadata + } else { + load_json_file(&metadata_file)? + }; let retry_policy = ExponentialBackoff::default(); @@ -310,6 +366,8 @@ async fn download( BlobKind::WasmModule, metadata.wasm_module_size as usize, dir.join("wasm_module.bin"), + resume, + concurrency, retry_policy.clone(), call_sender, ) @@ -324,6 +382,8 @@ async fn download( BlobKind::MainMemory, metadata.wasm_memory_size as usize, dir.join("wasm_memory.bin"), + resume, + concurrency, retry_policy.clone(), call_sender, ) @@ -339,6 +399,8 @@ async fn download( BlobKind::StableMemory, metadata.stable_memory_size as usize, dir.join("stable_memory.bin"), + resume, + concurrency, retry_policy.clone(), call_sender, ) @@ -348,16 +410,21 @@ async fn download( // Store Wasm chunks. if !metadata.wasm_chunk_store.is_empty() { let wasm_chunk_store_dir = dir.join("wasm_chunk_store"); - std::fs::create_dir(&wasm_chunk_store_dir).with_context(|| { - format!( - "Failed to create directory '{}'", - wasm_chunk_store_dir.display() - ) - })?; + if !wasm_chunk_store_dir.exists() { + std::fs::create_dir(&wasm_chunk_store_dir).with_context(|| { + format!( + "Failed to create directory '{}'", + wasm_chunk_store_dir.display() + ) + })?; + } for chunk_hash in metadata.wasm_chunk_store { let hash_str = hex::encode(&chunk_hash.hash); let chunk_file = wasm_chunk_store_dir.join(format!("{hash_str}.bin")); + if chunk_file.exists() { + continue; + } let chunk = retry(retry_policy.clone(), || async { let data_args = ReadCanisterSnapshotDataArgs { @@ -409,76 +476,125 @@ async fn upload( canister: String, replace: Option, dir: PathBuf, + resume: Option, + concurrency: usize, call_sender: &CallSender, ) -> DfxResult { let canister_id = canister .parse() .or_else(|_| env.get_canister_id_store()?.get(&canister))?; - // Upload snapshot metadata. + // Load the upload progress if provided. + let mut upload_progress = SnapshotUploadProgress::new(); + if let Some(resume_snapshot_id) = &resume { + let progress_path = dir.join(format!("{resume_snapshot_id}.json")); + if progress_path.exists() { + upload_progress = load_json_file(&progress_path)?; + } else { + bail!( + "Cannot resume uploading to snapshot {} because the progress file was not found in directory '{}'", + resume_snapshot_id, + dir.display() + ); + } + } + let metadata: ReadCanisterSnapshotMetadataResult = load_json_file(&dir.join("metadata.json"))?; - let metadata_args = UploadCanisterSnapshotMetadataArgs { - canister_id, - replace_snapshot: replace.as_ref().map(|x| x.0.clone()), - wasm_module_size: metadata.wasm_module_size, - globals: metadata.globals, - wasm_memory_size: metadata.wasm_memory_size, - stable_memory_size: metadata.stable_memory_size, - certified_data: metadata.certified_data, - global_timer: metadata.global_timer, - on_low_wasm_memory_hook_status: metadata.on_low_wasm_memory_hook_status, + let snapshot_id = if !upload_progress.metadata_uploaded { + // Upload snapshot metadata. + let metadata_args = UploadCanisterSnapshotMetadataArgs { + canister_id, + replace_snapshot: replace.as_ref().map(|x| x.0.clone()), + wasm_module_size: metadata.wasm_module_size, + globals: metadata.globals, + wasm_memory_size: metadata.wasm_memory_size, + stable_memory_size: metadata.stable_memory_size, + certified_data: metadata.certified_data, + global_timer: metadata.global_timer, + on_low_wasm_memory_hook_status: metadata.on_low_wasm_memory_hook_status, + }; + let snapshot_id: SnapshotId = + upload_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) + .await + .with_context(|| { + format!("Failed to upload snapshot metadata to canister {canister}") + })? + .into(); + upload_progress.snapshot_id = snapshot_id.to_string(); + upload_progress.metadata_uploaded = true; + save_json_file( + &dir.join(format!("{}.json", upload_progress.snapshot_id)), + &upload_progress, + )?; + + debug!( + env.get_logger(), + "Snapshot metadata uploaded to canister {canister} with Snapshot ID: {snapshot_id}", + ); + snapshot_id + } else { + SnapshotId::from_str(&upload_progress.snapshot_id)? }; - let snapshot_id = - upload_canister_snapshot_metadata(env, canister_id, &metadata_args, call_sender) - .await - .with_context(|| format!("Failed to upload snapshot metadata to canister {canister}"))? - .into(); - debug!( - env.get_logger(), - "Snapshot metadata uploaded to canister {canister} with Snapshot ID: {snapshot_id}" - ); let retry_policy = ExponentialBackoff::default(); // Upload Wasm module. - upload_data( - env, - &canister, - canister_id, - &snapshot_id, - BlobKind::WasmModule, - dir.join("wasm_module.bin"), - retry_policy.clone(), - call_sender, - ) - .await?; + write_upload_progress_file( + upload_data( + env, + &canister, + canister_id, + &snapshot_id, + BlobKind::WasmModule, + dir.join("wasm_module.bin"), + &mut upload_progress, + concurrency, + retry_policy.clone(), + call_sender, + ) + .await, + &upload_progress, + &dir, + )?; // Upload Wasm memory. - upload_data( - env, - &canister, - canister_id, - &snapshot_id, - BlobKind::MainMemory, - dir.join("wasm_memory.bin"), - retry_policy.clone(), - call_sender, - ) - .await?; - - // Upload stable memory. - if metadata.stable_memory_size > 0 { + write_upload_progress_file( upload_data( env, &canister, canister_id, &snapshot_id, - BlobKind::StableMemory, - dir.join("stable_memory.bin"), + BlobKind::MainMemory, + dir.join("wasm_memory.bin"), + &mut upload_progress, + concurrency, retry_policy.clone(), call_sender, ) - .await?; + .await, + &upload_progress, + &dir, + )?; + + // Upload stable memory. + if metadata.stable_memory_size > 0 { + write_upload_progress_file( + upload_data( + env, + &canister, + canister_id, + &snapshot_id, + BlobKind::StableMemory, + dir.join("stable_memory.bin"), + &mut upload_progress, + concurrency, + retry_policy.clone(), + call_sender, + ) + .await, + &upload_progress, + &dir, + )?; } // Upload Wasm chunks. @@ -486,33 +602,49 @@ async fn upload( let wasm_chunk_store_dir = dir.join("wasm_chunk_store"); for chunk_hash in metadata.wasm_chunk_store { let hash_str = hex::encode(&chunk_hash.hash); + if *upload_progress + .wasm_chunks_uploaded + .get(&hash_str) + .unwrap_or(&false) + { + // Skip the chunk if it has already been uploaded. + continue; + } + let chunk_file = wasm_chunk_store_dir.join(format!("{hash_str}.bin")); let chunk_data = std::fs::read(&chunk_file).with_context(|| { format!("Failed to read Wasm chunk from '{}'", chunk_file.display()) })?; - retry(retry_policy.clone(), || async { - let data_args = UploadCanisterSnapshotDataArgs { - canister_id, - snapshot_id: snapshot_id.0.clone(), - kind: SnapshotDataOffset::WasmChunk, - chunk: chunk_data.clone(), - }; - match upload_canister_snapshot_data( - env, - canister_id, - &data_args, - call_sender, - ) - .await - { - Ok(_) => Ok(()), - Err(_error) => Err(backoff::Error::transient(anyhow!( - "Failed to upload Wasm chunk {hash_str} to snapshot {snapshot_id} in canister {canister}" - ))), - } - }) - .await?; + write_upload_progress_file( + retry(retry_policy.clone(), || async { + let data_args = UploadCanisterSnapshotDataArgs { + canister_id, + snapshot_id: snapshot_id.0.clone(), + kind: SnapshotDataOffset::WasmChunk, + chunk: chunk_data.clone(), + }; + match upload_canister_snapshot_data( + env, + canister_id, + &data_args, + call_sender, + ) + .await + { + Ok(_) => Ok(()), + Err(_error) => Err(backoff::Error::transient(anyhow!( + "Failed to upload Wasm chunk {hash_str} to snapshot {snapshot_id} in canister {canister}" + ))), + } + }) + .await, + &upload_progress, + &dir, + )?; + + upload_progress.wasm_chunks_uploaded.insert(hash_str, true); + debug!( env.get_logger(), "Snapshot Wasm chunk {} uploaded to canister {} with Snapshot ID: {}", @@ -531,12 +663,27 @@ async fn upload( Ok(()) } +fn write_upload_progress_file( + result: DfxResult, + upload_progress: &SnapshotUploadProgress, + dir: &Path, +) -> DfxResult { + save_json_file( + &dir.join(format!("{}.json", upload_progress.snapshot_id)), + &upload_progress, + )?; + result +} + fn check_dir(dir: &PathBuf) -> DfxResult { - // Check if the directory is empty. + // Check if the directory is empty if not resuming. let mut entries = std::fs::read_dir(dir) .with_context(|| format!("Failed to read directory '{}'", dir.display()))?; if entries.next().is_some() { - bail!("Directory '{}' is not empty", dir.display()); + bail!( + "Directory '{}' is not empty (to resume an aborted download, use `--resume`)", + dir.display() + ); } // Check if the directory is writable. @@ -558,13 +705,36 @@ fn check_dir(dir: &PathBuf) -> DfxResult { Ok(()) } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] enum BlobKind { WasmModule, MainMemory, StableMemory, } +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SnapshotUploadProgress { + snapshot_id: String, + metadata_uploaded: bool, + wasm_module_progress: usize, + wasm_memory_progress: usize, + stable_memory_progress: usize, + wasm_chunks_uploaded: HashMap, +} + +impl SnapshotUploadProgress { + fn new() -> Self { + Self { + snapshot_id: String::new(), + metadata_uploaded: false, + wasm_module_progress: 0, + wasm_memory_progress: 0, + stable_memory_progress: 0, + wasm_chunks_uploaded: HashMap::new(), + } + } +} + const MAX_CHUNK_SIZE: usize = 2_000_000; async fn store_data( @@ -575,6 +745,8 @@ async fn store_data( blob_kind: BlobKind, length: usize, file_path: PathBuf, + resume: bool, + concurrency: usize, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { @@ -584,23 +756,26 @@ async fn store_data( BlobKind::StableMemory => "stable memory", }; - let blob = read_blob( + debug!(env.get_logger(), "Downloading {message}"); + + download_blob_to_file( env, canister, canister_id, snapshot_id, blob_kind, length, + &file_path, retry_policy.clone(), + resume, + concurrency, call_sender, ) .await .with_context(|| { - format!("Failed to read {message} from snapshot {snapshot_id} in canister {canister}") + format!("Failed to download {message} from snapshot {snapshot_id} in canister {canister}") })?; - std::fs::write(&file_path, &blob) - .with_context(|| format!("Failed to write {message} to '{}'", file_path.display()))?; debug!( env.get_logger(), "The {message} has been saved to '{}'", @@ -610,60 +785,138 @@ async fn store_data( Ok(()) } -async fn read_blob( +async fn download_blob_to_file( env: &dyn Environment, canister: &str, canister_id: Principal, snapshot: &SnapshotId, blob_kind: BlobKind, length: usize, + file_path: &PathBuf, retry_policy: ExponentialBackoff, + resume: bool, + concurrency: usize, call_sender: &CallSender, -) -> DfxResult> { - let mut blob: Vec = vec![0; length]; +) -> DfxResult { let mut offset = 0; - while offset < length { - let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); - let kind = match blob_kind { - BlobKind::WasmModule => SnapshotDataKind::WasmModule { - offset: offset as u64, - size: chunk_size as u64, - }, - BlobKind::MainMemory => SnapshotDataKind::WasmMemory { - offset: offset as u64, - size: chunk_size as u64, - }, - BlobKind::StableMemory => SnapshotDataKind::StableMemory { - offset: offset as u64, - size: chunk_size as u64, - }, - }; - let chunk = retry(retry_policy.clone(), || async { + if resume && file_path.exists() { + let file_length = std::fs::metadata(file_path) + .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? + .len() as usize; + if file_length >= length { + return Ok(()); + } + offset = file_length; + + debug!( + env.get_logger(), + "Resuming downloading '{}' from offset {}", + file_path.display(), + offset + ); + } + + let pb = get_progress_bar(env, length as u64); + pb.set_position(offset as u64); + + // Create file if it doesn't exist, otherwise open it in append mode. + let mut file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(file_path) + .await?; + + let mut next_offset = offset; + let mut next_request_offset = offset; + let mut in_progress: FuturesUnordered<_> = FuturesUnordered::new(); + let mut ready_chunks: BTreeMap> = BTreeMap::new(); + + while next_offset < length { + // Schedule next chunk to download. + while next_request_offset < length + && in_progress.len() < concurrency + && ready_chunks.len() < concurrency * 2 + { + let chunk_size = std::cmp::min(length - next_request_offset, MAX_CHUNK_SIZE); + let kind = match blob_kind { + BlobKind::WasmModule => SnapshotDataKind::WasmModule { + offset: next_request_offset as u64, + size: chunk_size as u64, + }, + BlobKind::MainMemory => SnapshotDataKind::WasmMemory { + offset: next_request_offset as u64, + size: chunk_size as u64, + }, + BlobKind::StableMemory => SnapshotDataKind::StableMemory { + offset: next_request_offset as u64, + size: chunk_size as u64, + }, + }; let data_args = ReadCanisterSnapshotDataArgs { canister_id, snapshot_id: snapshot.0.clone(), - kind: kind.clone(), + kind, }; - match read_canister_snapshot_data(env, canister_id, &data_args, call_sender).await { - Ok(chunk) => Ok(chunk), - Err(error) if is_retryable(&error) => { - error!( - env.get_logger(), - "Failed to read {:?} from snapshot {snapshot} in canister {canister}.", - blob_kind, - ); - Err(backoff::Error::transient(anyhow!(error))) - } - Err(error) => Err(backoff::Error::permanent(anyhow!(error))), - } - }) - .await? - .chunk; - blob[offset..offset + chunk_size].copy_from_slice(&chunk); - offset += chunk_size; + let retry_policy = retry_policy.clone(); + + // Download chunk. + in_progress.push(async move { + let chunk = retry(retry_policy, || async { + match read_canister_snapshot_data(env, canister_id, &data_args, call_sender) + .await + { + Ok(chunk) => Ok(chunk), + Err(error) if is_retryable(&error) => { + error!( + env.get_logger(), + "Failed to read {:?} from snapshot {snapshot} in canister {canister}.", + blob_kind, + ); + Err(backoff::Error::transient(anyhow!(error))) + } + Err(error) => Err(backoff::Error::permanent(anyhow!(error))), + } + }) + .await? + .chunk; + + Ok::<(usize, Vec), anyhow::Error>((next_request_offset, chunk)) + }); + + next_request_offset += chunk_size; + } + + // Process completed chunks if any are ready. + while let Some(Some(res)) = in_progress.next().now_or_never() { + let (chunk_offset, chunk) = res?; + ready_chunks.insert(chunk_offset, chunk); + } + + // Write completed chunks in order. + while let Some(chunk) = ready_chunks.remove(&next_offset) { + file.write_all(&chunk).await?; + next_offset += chunk.len(); + pb.set_position(next_offset as u64); + } } - Ok(blob) + // Wait for any remaining downloads to complete. + while let Some(res) = in_progress.next().await { + let (chunk_offset, chunk) = res?; + ready_chunks.insert(chunk_offset, chunk); + + while let Some(chunk) = ready_chunks.remove(&next_offset) { + file.write_all(&chunk).await?; + next_offset += chunk.len(); + pb.set_position(next_offset as u64); + } + } + + file.flush().await?; + + pb.finish(); + + Ok(()) } async fn upload_data( @@ -673,23 +926,28 @@ async fn upload_data( snapshot_id: &SnapshotId, blob_kind: BlobKind, file_path: PathBuf, + upload_progress: &mut SnapshotUploadProgress, + concurrency: usize, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { - let message = match blob_kind { - BlobKind::WasmModule => "Wasm module", - BlobKind::MainMemory => "Wasm memory", - BlobKind::StableMemory => "stable memory", + let (message, progress) = match blob_kind { + BlobKind::WasmModule => ("Wasm module", &mut upload_progress.wasm_module_progress), + BlobKind::MainMemory => ("Wasm memory", &mut upload_progress.wasm_memory_progress), + BlobKind::StableMemory => ("stable memory", &mut upload_progress.stable_memory_progress), }; - let blob = std::fs::read(&file_path) - .with_context(|| format!("Failed to read {message} from '{}'", file_path.display()))?; + + debug!(env.get_logger(), "Uploading {message}"); + upload_blob( env, canister, canister_id, snapshot_id, blob_kind, - blob, + &file_path, + progress, + concurrency, retry_policy.clone(), call_sender, ) @@ -699,7 +957,8 @@ async fn upload_data( })?; debug!( env.get_logger(), - "Snapshot {message} uploaded to canister {canister} with Snapshot ID: {snapshot_id}" + "The {message} has been uploaded from '{}'", + file_path.display() ); Ok(()) @@ -711,49 +970,128 @@ async fn upload_blob( canister_id: Principal, snapshot: &SnapshotId, blob_kind: BlobKind, - data: Vec, + file_path: &PathBuf, + upload_progress: &mut usize, + concurrency: usize, retry_policy: ExponentialBackoff, call_sender: &CallSender, ) -> DfxResult { - let length = data.len(); - let mut offset = 0; - while offset < length { - let chunk_size = std::cmp::min(length - offset, MAX_CHUNK_SIZE); - let kind = match blob_kind { - BlobKind::WasmModule => SnapshotDataOffset::WasmModule { - offset: offset as u64, - }, - BlobKind::MainMemory => SnapshotDataOffset::WasmMemory { - offset: offset as u64, - }, - BlobKind::StableMemory => SnapshotDataOffset::StableMemory { - offset: offset as u64, - }, - }; - retry(retry_policy.clone(), || async { + let length = std::fs::metadata(file_path) + .with_context(|| format!("Failed to get length of file '{}'", file_path.display()))? + .len() as usize; + + let offset = *upload_progress; + if offset >= length { + return Ok(()); + } + if offset > 0 { + debug!( + env.get_logger(), + "Resuming uploading '{}' from offset {}", + file_path.display(), + offset + ); + } + + let pb = get_progress_bar(env, length as u64); + pb.set_position(offset as u64); + + // Open the file once and seek to the offset. + let mut file = tokio::fs::File::open(file_path) + .await + .with_context(|| format!("Failed to open file '{}' for reading", file_path.display()))?; + file.seek(SeekFrom::Start(offset as u64)).await?; + + let mut next_offset = offset; + let mut next_request_offset = offset; + let mut in_progress: FuturesUnordered<_> = FuturesUnordered::new(); + let mut completed: BTreeMap = BTreeMap::new(); + + while next_offset < length { + // Schedule next chunk to upload. + while next_request_offset < length + && in_progress.len() < concurrency + && completed.len() < concurrency * 2 + { + let chunk_size = std::cmp::min(length - next_request_offset, MAX_CHUNK_SIZE); + let kind = match blob_kind { + BlobKind::WasmModule => SnapshotDataOffset::WasmModule { + offset: next_request_offset as u64, + }, + BlobKind::MainMemory => SnapshotDataOffset::WasmMemory { + offset: next_request_offset as u64, + }, + BlobKind::StableMemory => SnapshotDataOffset::StableMemory { + offset: next_request_offset as u64, + }, + }; + + // Read the bytes for this chunk. + let mut chunk = vec![0u8; chunk_size]; + file.read_exact(&mut chunk).await?; + let data_args = UploadCanisterSnapshotDataArgs { canister_id, snapshot_id: snapshot.0.clone(), - kind: kind.clone(), - chunk: data[offset..offset + chunk_size].to_vec(), + kind, + chunk, }; - match upload_canister_snapshot_data(env, canister_id, &data_args, call_sender).await { - Ok(_) => Ok(()), - Err(error) if is_retryable(&error) => { - error!( - env.get_logger(), - "Failed to upload {:?} to snapshot {snapshot} in canister {canister}.", - blob_kind, - ); - Err(backoff::Error::transient(anyhow!(error))) - } - Err(error) => Err(backoff::Error::permanent(anyhow!(error))), - } - }) - .await?; - offset += chunk_size; + let retry_policy = retry_policy.clone(); + + // Start upload for this chunk. + in_progress.push(async move { + retry(retry_policy, || async { + match upload_canister_snapshot_data(env, canister_id, &data_args, call_sender) + .await + { + Ok(()) => Ok(()), + Err(error) if is_retryable(&error) => { + error!( + env.get_logger(), + "Failed to upload {:?} to snapshot {snapshot} in canister {canister}.", + blob_kind, + ); + Err(backoff::Error::transient(anyhow!(error))) + } + Err(error) => Err(backoff::Error::permanent(anyhow!(error))), + } + }) + .await?; + + Ok::<(usize, usize), anyhow::Error>((next_request_offset, chunk_size)) + }); + + next_request_offset += chunk_size; + } + + // Process completed uploads if any are ready. + while let Some(Some(res)) = in_progress.next().now_or_never() { + let (uploaded_offset, uploaded_size) = res?; + completed.insert(uploaded_offset, uploaded_size); + } + + // Advance and update the progress bar. + while let Some(size) = completed.remove(&next_offset) { + next_offset += size; + pb.set_position(next_offset as u64); + *upload_progress = next_offset; + } } + // Wait for any remaining uploads to complete. + while let Some(res) = in_progress.next().await { + let (uploaded_offset, uploaded_size) = res?; + completed.insert(uploaded_offset, uploaded_size); + + while let Some(size) = completed.remove(&next_offset) { + next_offset += size; + pb.set_position(next_offset as u64); + *upload_progress = next_offset; + } + } + + pb.finish(); + Ok(()) } @@ -764,3 +1102,12 @@ fn is_retryable(error: &Error) -> bool { false } + +fn get_progress_bar(env: &dyn Environment, total_size: u64) -> ProgressBar { + let pb = env.new_progress(total_size); + pb.set_style(ProgressStyle::default_bar() + .template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})") + .expect("Failed to set template string") + .progress_chars("#>-")); + pb +} diff --git a/src/dfx/src/lib/environment.rs b/src/dfx/src/lib/environment.rs index b551926351..e647f56baf 100644 --- a/src/dfx/src/lib/environment.rs +++ b/src/dfx/src/lib/environment.rs @@ -60,7 +60,7 @@ pub trait Environment: Send + Sync { fn get_verbose_level(&self) -> i64; fn new_spinner(&self, message: Cow<'static, str>) -> ProgressBar; fn with_suspend_all_spinners(&self, f: Box); // box needed for dyn Environment - fn new_progress(&self, message: &str) -> ProgressBar; + fn new_progress(&self, total_size: u64) -> ProgressBar; fn new_identity_manager(&self) -> Result { IdentityManager::new( @@ -294,8 +294,13 @@ impl Environment for EnvironmentImpl { self.spinners.suspend(f); } - fn new_progress(&self, _message: &str) -> ProgressBar { - ProgressBar::discard() + fn new_progress(&self, total_size: u64) -> ProgressBar { + // Only show the progress bar if the level is INFO or more. + if self.verbose_level >= 0 { + ProgressBar::new_progress(total_size, &self.spinners) + } else { + ProgressBar::discard() + } } fn get_selected_identity(&self) -> Option<&String> { @@ -481,8 +486,8 @@ impl<'a> Environment for AgentEnvironment<'a> { self.backend.with_suspend_all_spinners(f); } - fn new_progress(&self, message: &str) -> ProgressBar { - self.backend.new_progress(message) + fn new_progress(&self, total_size: u64) -> ProgressBar { + self.backend.new_progress(total_size) } fn get_selected_identity(&self) -> Option<&String> { @@ -604,7 +609,7 @@ pub mod test_env { fn new_identity_manager(&self) -> Result { unimplemented!() } - fn new_progress(&self, _message: &str) -> ProgressBar { + fn new_progress(&self, _total_size: u64) -> ProgressBar { ProgressBar::discard() } fn with_suspend_all_spinners(&self, f: Box) { diff --git a/src/dfx/src/lib/progress_bar.rs b/src/dfx/src/lib/progress_bar.rs index ccb7fccd90..53eca86309 100644 --- a/src/dfx/src/lib/progress_bar.rs +++ b/src/dfx/src/lib/progress_bar.rs @@ -1,5 +1,5 @@ #![allow(clippy::disallowed_types)] -use indicatif::{MultiProgress, ProgressBar as IndicatifProgressBar}; +use indicatif::{MultiProgress, ProgressBar as IndicatifProgressBar, ProgressStyle}; use std::{borrow::Cow, time::Duration}; pub struct ProgressBar { @@ -36,8 +36,20 @@ impl ProgressBar { } } + pub fn new_progress(total_size: u64, set: &MultiProgress) -> Self { + let progress_bar = IndicatifProgressBar::new(total_size); + set.add(progress_bar.clone()); + + ProgressBar { + bar: Some(progress_bar), + } + } + + forward_fn_impl!(finish); forward_fn_impl!(finish_and_clear); forward_fn_impl!(set_message, message: Cow<'static, str>); + forward_fn_impl!(set_position, position: u64); + forward_fn_impl!(set_style, style: ProgressStyle); pub fn discard() -> Self { ProgressBar { bar: None }