Skip to content

Combining the streamz.Stream.filenames() and streamz.Stream.from_textfile() using dask scatter? #480

@ayanb07

Description

@ayanb07

I want to use the streamz.Stream.filenames() API to feed the filenames parallelly to streamz.Stream.from_textfile() and do computation on the lines of the textfile till they keep coming. But the API is made in such a way that we can use either one but not both. Below is the example of the existing code written partially for a single file. Each signal I get is of 1000 values. I get 4K signals per second. per channel. This one file is for one channel.

source = streamz.Stream.from_textfile(r"~\Channel_C.txt")

def split(line):
    return float(line.strip().split()[1])

pulse = source.map(split).partition(1000).map(list)
pulse.map(invert_signal).map(find_true_cfd).map(write_true_cfd_to_file)
source.start()

If the two API could be combined. Then I would like to do the following. I would distribute the files I am getting per channel. 200 such files will be there. Distribute them to dask worker threads.

from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()

source = streamz.Stream.filenames("*.txt").scatter().from_textfile().map(split).partition(1000).map(list).map(invert_signal).map(find_true_cfd).map(write_true_cfd_to_file)
source.start()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions