Skip to content

Commit b5e9e4d

Browse files
feat(downloads): interleave the downloads with their installations
Even though downloads are done concurrently, the installations are done sequentially. This means that, as downloads complete, they are in a queue (an mpsc channel) waiting to be consumed by the future responsible for the (sequential) installations. There was a need to relax some test cases to allow for uninstall to happen before the downloads.
1 parent 59aec45 commit b5e9e4d

File tree

2 files changed

+104
-49
lines changed

2 files changed

+104
-49
lines changed

src/dist/manifestation.rs

Lines changed: 104 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ mod tests;
66

77
use std::path::Path;
88

9-
use anyhow::{Context, Result, anyhow, bail};
9+
use anyhow::{Context, Error, Result, anyhow, bail};
1010
use futures_util::stream::StreamExt;
1111
use std::sync::Arc;
12-
use tokio::sync::Semaphore;
12+
use tokio::sync::{Semaphore, mpsc};
1313
use tracing::{info, warn};
1414
use url::Url;
1515

@@ -156,8 +156,7 @@ impl Manifestation {
156156
let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;
157157

158158
// Download component packages and validate hashes
159-
let mut things_to_install = Vec::new();
160-
let mut things_downloaded = Vec::new();
159+
let mut things_downloaded: Vec<String> = Vec::new();
161160
let components = update
162161
.components_urls_and_hashes(new_manifest)
163162
.map(|res| {
@@ -168,7 +167,6 @@ impl Manifestation {
168167
})
169168
})
170169
.collect::<Result<Vec<_>>>()?;
171-
172170
let components_len = components.len();
173171
const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2;
174172
let concurrent_downloads = download_cfg
@@ -184,46 +182,15 @@ impl Manifestation {
184182
.and_then(|s| s.parse().ok())
185183
.unwrap_or(DEFAULT_MAX_RETRIES);
186184

187-
info!("downloading component(s)");
188-
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
189-
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
190-
let sem = semaphore.clone();
191-
async move {
192-
let _permit = sem.acquire().await.unwrap();
193-
let url = if altered {
194-
utils::parse_url(
195-
&bin.binary
196-
.url
197-
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
198-
)?
199-
} else {
200-
utils::parse_url(&bin.binary.url)?
201-
};
202-
203-
bin.download(&url, download_cfg, max_retries, new_manifest)
204-
.await
205-
.map(|downloaded| (bin, downloaded))
206-
}
207-
});
208-
if components_len > 0 {
209-
let results = component_stream
210-
.buffered(components_len)
211-
.collect::<Vec<_>>()
212-
.await;
213-
for result in results {
214-
let (bin, downloaded_file) = result?;
215-
things_downloaded.push(bin.binary.hash.clone());
216-
things_to_install.push((bin, downloaded_file));
217-
}
218-
}
219-
220-
// Begin transaction
185+
// Begin transaction before the downloads, as installations are interleaved with those
221186
let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.process);
222187

223188
// If the previous installation was from a v1 manifest we need
224189
// to uninstall it first.
225190
tx = self.maybe_handle_v2_upgrade(&config, tx, download_cfg.process)?;
226191

192+
info!("downloading component(s)");
193+
227194
// Uninstall components
228195
for component in &update.components_to_uninstall {
229196
match (implicit_modify, &component.target) {
@@ -255,9 +222,103 @@ impl Manifestation {
255222
tx = self.uninstall_component(component, new_manifest, tx, download_cfg.process)?;
256223
}
257224

258-
// Install components
259-
for (component_bin, installer_file) in things_to_install {
260-
tx = component_bin.install(installer_file, tx, new_manifest, self, download_cfg)?;
225+
if components_len > 0 {
226+
// Create a channel to communicate whenever a download is done and the component can be installed
227+
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
228+
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
229+
let total_components = components.len();
230+
let (download_tx, mut download_rx) =
231+
mpsc::channel::<Result<(ComponentBinary<'_>, File)>>(total_components);
232+
233+
#[allow(clippy::too_many_arguments)]
234+
fn component_stream<'a>(
235+
components: Vec<ComponentBinary<'a>>,
236+
semaphore: Arc<Semaphore>,
237+
download_tx: mpsc::Sender<Result<(ComponentBinary<'a>, File)>>,
238+
altered: bool,
239+
dist_server: &str,
240+
download_cfg: &DownloadCfg<'a>,
241+
max_retries: usize,
242+
new_manifest: &Manifest,
243+
) -> impl futures_util::Stream<Item = impl Future<Output = Result<String>>>
244+
{
245+
tokio_stream::iter(components).map(move |bin| {
246+
let sem = semaphore.clone();
247+
let download_tx = download_tx.clone();
248+
async move {
249+
let _permit = sem.acquire().await.unwrap();
250+
let url = if altered {
251+
utils::parse_url(
252+
&bin.binary.url.replace(DEFAULT_DIST_SERVER, dist_server),
253+
)?
254+
} else {
255+
utils::parse_url(&bin.binary.url)?
256+
};
257+
258+
let installer_file = bin
259+
.download(&url, download_cfg, max_retries, new_manifest)
260+
.await?;
261+
let hash = bin.binary.hash.clone();
262+
let _ = download_tx.send(Ok((bin, installer_file))).await;
263+
Ok(hash)
264+
}
265+
})
266+
}
267+
268+
async fn download_handle(
269+
mut stream: impl futures_util::Stream<Item = Result<String>> + Unpin,
270+
download_tx: mpsc::Sender<Result<(ComponentBinary<'_>, File)>>,
271+
) -> Vec<String> {
272+
let mut hashes = Vec::new();
273+
while let Some(result) = stream.next().await {
274+
match result {
275+
Ok(hash) => {
276+
hashes.push(hash);
277+
}
278+
Err(e) => {
279+
let _ = download_tx.send(Err(e)).await;
280+
}
281+
}
282+
}
283+
hashes
284+
}
285+
286+
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
287+
let component_stream = component_stream(
288+
components,
289+
semaphore,
290+
download_tx.clone(),
291+
altered,
292+
tmp_cx.dist_server.as_str(),
293+
&download_cfg,
294+
max_retries,
295+
&new_manifest,
296+
);
297+
298+
let stream = component_stream.buffered(components_len);
299+
let download_handle = download_handle(stream, download_tx.clone());
300+
let install_handle = async {
301+
let mut current_tx = tx;
302+
let mut counter = 0;
303+
while counter < total_components
304+
&& let Some(message) = download_rx.recv().await
305+
{
306+
let (component_bin, installer_file) = message?;
307+
current_tx = component_bin.install(
308+
installer_file,
309+
current_tx,
310+
new_manifest,
311+
self,
312+
download_cfg,
313+
)?;
314+
counter += 1;
315+
}
316+
Ok::<_, Error>(current_tx)
317+
};
318+
319+
let (download_results, install_result) = tokio::join!(download_handle, install_handle);
320+
things_downloaded = download_results;
321+
tx = install_result?;
261322
}
262323

263324
// Install new distribution manifest
@@ -709,7 +770,7 @@ impl<'a> ComponentBinary<'a> {
709770
let downloaded_file = RetryIf::spawn(
710771
FixedInterval::from_millis(0).take(max_retries),
711772
|| download_cfg.download(url, &self.binary.hash, &self.status),
712-
|e: &anyhow::Error| {
773+
|e: &Error| {
713774
// retry only known retriable cases
714775
match e.downcast_ref::<RustupError>() {
715776
Some(RustupError::BrokenPartialFile)

tests/suite/cli_rustup.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ async fn rustup_stable() {
3535
.with_stderr(snapbox::str![[r#"
3636
info: syncing channel updates for stable-[HOST_TRIPLE]
3737
info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0)
38-
info: downloading component[..]
3938
...
4039
info: cleaning up downloads & tmp directories
4140
@@ -131,15 +130,12 @@ async fn rustup_all_channels() {
131130
.with_stderr(snapbox::str![[r#"
132131
info: syncing channel updates for stable-[HOST_TRIPLE]
133132
info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0)
134-
info: downloading component[..]
135133
...
136134
info: syncing channel updates for beta-[HOST_TRIPLE]
137135
info: latest update on 2015-01-02 for version 1.2.0 (hash-beta-1.2.0)
138-
info: downloading component[..]
139136
...
140137
info: syncing channel updates for nightly-[HOST_TRIPLE]
141138
info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2)
142-
info: downloading component[..]
143139
...
144140
info: cleaning up downloads & tmp directories
145141
@@ -208,12 +204,10 @@ async fn rustup_some_channels_up_to_date() {
208204
.with_stderr(snapbox::str![[r#"
209205
info: syncing channel updates for stable-[HOST_TRIPLE]
210206
info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0)
211-
info: downloading component[..]
212207
...
213208
info: syncing channel updates for beta-[HOST_TRIPLE]
214209
info: syncing channel updates for nightly-[HOST_TRIPLE]
215210
info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2)
216-
info: downloading component[..]
217211
...
218212
info: cleaning up downloads & tmp directories
219213

0 commit comments

Comments
 (0)