-
Notifications
You must be signed in to change notification settings - Fork 149
Open
Description
In a simple use case like downloading files and process them on a single machine, how could one achieve parallelization of downloads and processes with buffers?
Example:
import time
from streamz import Stream
from tornado.ioloop import IOLoop
def download_file(file_id: int):
time.sleep(1)
print(f"Downloaded file: {file_id}")
return file_id
def process_file(file_id: int):
time.sleep(2)
print(f"Processed file : {file_id}")
return file_id
async def streamz_run():
s = Stream(asynchronous=True)
s.map(download_file).buffer(4).sink(process_file)
for i in range(10):
await s.emit(i)
if __name__ == '__main__':
start = time.time()
IOLoop().run_sync(streamz_run)
print(f"Streamz run took: {time.time() - start}s")
The download_file
is properly buffered but not executed at the same time as process_file
. The whole thing takes ~30s to run while we could expect 21s with parallel downloads/processes. Is using Dask the intended way in that case?
Metadata
Metadata
Assignees
Labels
No labels