@@ -7,9 +7,9 @@ mod tests;
77use std:: path:: Path ;
88
99use anyhow:: { Context , Error , Result , anyhow, bail} ;
10- use futures_util:: stream:: StreamExt ;
10+ use futures_util:: stream:: { FuturesUnordered , StreamExt } ;
1111use std:: sync:: Arc ;
12- use tokio:: sync:: { Semaphore , mpsc } ;
12+ use tokio:: sync:: Semaphore ;
1313use tracing:: { info, warn} ;
1414use url:: Url ;
1515
@@ -228,110 +228,82 @@ impl Manifestation {
228228 // The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
229229 // This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
230230 let total_components = components. len ( ) ;
231- let ( download_tx, mut download_rx) =
232- mpsc:: channel :: < Result < ( ComponentBinary , File ) > > ( total_components) ;
233231
234- #[ allow( clippy:: too_many_arguments) ]
235- fn component_stream (
232+ fn create_download_futures (
236233 components : Vec < ComponentBinary > ,
237234 semaphore : Arc < Semaphore > ,
238- download_tx : mpsc:: Sender < Result < ( ComponentBinary , File ) > > ,
239235 altered : bool ,
240236 dist_server : & str ,
241237 download_cfg : & DownloadCfg ,
242238 max_retries : usize ,
243239 new_manifest : & Manifest ,
244- ) -> impl futures_util :: Stream < Item = impl Future < Output = Result < String > > >
240+ ) -> FuturesUnordered < impl Future < Output = Result < ( ComponentBinary , File , String ) > > >
245241 {
246- tokio_stream:: iter ( components) . map ( move |bin| {
242+ let futures = FuturesUnordered :: new ( ) ;
243+ for bin in components {
247244 let sem = semaphore. clone ( ) ;
248- let download_tx = download_tx. clone ( ) ;
249- async move {
245+ let dist_server = dist_server. to_string ( ) ;
246+ let download_cfg = download_cfg. clone ( ) ;
247+ let new_manifest = new_manifest. clone ( ) ;
248+
249+ let future = async move {
250250 let _permit = sem. acquire ( ) . await . unwrap ( ) ;
251251 let url = if altered {
252252 utils:: parse_url (
253- & bin. binary . url . replace ( DEFAULT_DIST_SERVER , dist_server) ,
253+ & bin. binary . url . replace ( DEFAULT_DIST_SERVER , & dist_server) ,
254254 ) ?
255255 } else {
256256 utils:: parse_url ( & bin. binary . url ) ?
257257 } ;
258258
259259 let installer_file = bin
260- . download ( & url, download_cfg, max_retries, new_manifest)
260+ . download ( & url, & download_cfg, max_retries, & new_manifest)
261261 . await ?;
262262 let hash = bin. binary . hash . clone ( ) ;
263- let _ = download_tx. send ( Ok ( ( bin, installer_file) ) ) . await ;
264- Ok ( hash)
265- }
266- } )
267- }
268-
269- async fn download_handle (
270- mut stream : impl futures_util:: Stream < Item = Result < String > > + Unpin ,
271- download_tx : mpsc:: Sender < Result < ( ComponentBinary , File ) > > ,
272- ) -> Vec < String > {
273- let mut hashes = Vec :: new ( ) ;
274- while let Some ( result) = stream. next ( ) . await {
275- match result {
276- Ok ( hash) => {
277- hashes. push ( hash) ;
278- }
279- Err ( e) => {
280- let _ = download_tx. send ( Err ( e) ) . await ;
281- }
282- }
263+ Ok ( ( bin, installer_file, hash) )
264+ } ;
265+ futures. push ( future) ;
283266 }
284- hashes
267+ futures
285268 }
286269
287270 let semaphore = Arc :: new ( Semaphore :: new ( concurrent_downloads) ) ;
288- let component_stream = component_stream (
271+ let mut download_stream = create_download_futures (
289272 components,
290273 semaphore,
291- download_tx. clone ( ) ,
292274 altered,
293275 tmp_cx. dist_server . as_str ( ) ,
294276 & download_cfg,
295277 max_retries,
296278 & new_manifest,
297279 ) ;
298280
299- let stream = component_stream. buffered ( components_len) ;
300- let download_handle = download_handle ( stream, download_tx. clone ( ) ) ;
301- let install_handle = {
302- let new_manifest = new_manifest. clone ( ) ;
303- let download_cfg = download_cfg. clone ( ) ;
304- async move {
305- let mut current_tx = tx;
306- let mut counter = 0 ;
307- while counter < total_components
308- && let Some ( message) = download_rx. recv ( ) . await
309- {
310- let ( component_bin, installer_file) = message?;
311- current_tx = tokio:: task:: spawn_blocking ( {
312- let this = self . clone ( ) ;
313- let new_manifest = new_manifest. clone ( ) ;
314- let download_cfg = download_cfg. clone ( ) ;
315- move || {
316- component_bin. install (
317- installer_file,
318- current_tx,
319- & new_manifest,
320- & this,
321- & download_cfg,
322- )
323- }
324- } )
325- . await ??;
326- counter += 1 ;
327- }
328- Ok :: < _ , Error > ( current_tx)
281+ let mut counter = 0 ;
282+ while counter < total_components {
283+ if let Some ( result) = download_stream. next ( ) . await {
284+ let ( component_bin, installer_file, hash) = result?;
285+ things_downloaded. push ( hash) ;
286+
287+ tx = tokio:: task:: spawn_blocking ( {
288+ let this = self . clone ( ) ;
289+ let new_manifest = new_manifest. clone ( ) ;
290+ let download_cfg = download_cfg. clone ( ) ;
291+ move || {
292+ component_bin. install (
293+ installer_file,
294+ tx,
295+ & new_manifest,
296+ & this,
297+ & download_cfg,
298+ )
299+ }
300+ } )
301+ . await ??;
302+ counter += 1 ;
303+ } else {
304+ break ;
329305 }
330- } ;
331-
332- let ( download_results, install_result) = tokio:: join!( download_handle, install_handle) ;
333- things_downloaded = download_results;
334- tx = install_result?;
306+ }
335307 }
336308
337309 // Install new distribution manifest
0 commit comments