-
Notifications
You must be signed in to change notification settings - Fork 149
Open
Description
I want to use the streamz.Stream.filenames() API to feed the filenames parallelly to streamz.Stream.from_textfile() and do computation on the lines of the textfile till they keep coming. But the API is made in such a way that we can use either one but not both. Below is the example of the existing code written partially for a single file. Each signal I get is of 1000 values. I get 4K signals per second. per channel. This one file is for one channel.
source = streamz.Stream.from_textfile(r"~\Channel_C.txt")
def split(line):
return float(line.strip().split()[1])
pulse = source.map(split).partition(1000).map(list)
pulse.map(invert_signal).map(find_true_cfd).map(write_true_cfd_to_file)
source.start()
If the two API could be combined. Then I would like to do the following. I would distribute the files I am getting per channel. 200 such files will be there. Distribute them to dask worker threads.
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
source = streamz.Stream.filenames("*.txt").scatter().from_textfile().map(split).partition(1000).map(list).map(invert_signal).map(find_true_cfd).map(write_true_cfd_to_file)
source.start()
Metadata
Metadata
Assignees
Labels
No labels