Skip to content

Commit bf4dabd

Browse files
committed
Add async run method and dview argument to algos
1 parent 8e0a2d2 commit bf4dabd

File tree

5 files changed

+92
-48
lines changed

5 files changed

+92
-48
lines changed

mesmerize_core/algorithms/cnmf.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Performs CNMF in a separate process"""
22

3+
import asyncio
34
import click
45
import caiman as cm
56
from caiman.source_extraction.cnmf import cnmf as cnmf
@@ -21,7 +22,10 @@
2122
from ..utils import IS_WINDOWS
2223

2324

24-
def run_algo(batch_path, uuid, data_path: str = None):
25+
def run_algo(batch_path, uuid, data_path: str = None, dview=None):
26+
asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview))
27+
28+
async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None):
2529
algo_start = time.time()
2630
set_parent_raw_data_path(data_path)
2731

@@ -42,18 +46,23 @@ def run_algo(batch_path, uuid, data_path: str = None):
4246
f"Starting CNMF item:\n{item}\nWith params:{params}"
4347
)
4448

45-
# adapted from current demo notebook
46-
if "MESMERIZE_N_PROCESSES" in os.environ.keys():
47-
try:
48-
n_processes = int(os.environ["MESMERIZE_N_PROCESSES"])
49-
except:
50-
n_processes = psutil.cpu_count() - 1
49+
if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'):
50+
n_processes = dview._processes
51+
elif 'ipyparallel' in str(type(dview)):
52+
n_processes = len(dview)
5153
else:
52-
n_processes = psutil.cpu_count() - 1
53-
# Start cluster for parallel processing
54-
c, dview, n_processes = cm.cluster.setup_cluster(
55-
backend="local", n_processes=n_processes, single_thread=False
56-
)
54+
# adapted from current demo notebook
55+
if "MESMERIZE_N_PROCESSES" in os.environ.keys():
56+
try:
57+
n_processes = int(os.environ["MESMERIZE_N_PROCESSES"])
58+
except:
59+
n_processes = psutil.cpu_count() - 1
60+
else:
61+
n_processes = psutil.cpu_count() - 1
62+
# Start cluster for parallel processing
63+
c, dview, n_processes = cm.cluster.setup_cluster(
64+
backend="multiprocessing", n_processes=n_processes, single_thread=False
65+
)
5766

5867
# merge cnmf and eval kwargs into one dict
5968
cnmf_params = CNMFParams(params_dict=params["main"])

mesmerize_core/algorithms/cnmfe.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import click
23
import numpy as np
34
import caiman as cm
@@ -18,7 +19,10 @@
1819
from ..utils import IS_WINDOWS
1920

2021

21-
def run_algo(batch_path, uuid, data_path: str = None):
22+
def run_algo(batch_path, uuid, data_path: str = None, dview=None):
23+
asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview))
24+
25+
async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None):
2226
algo_start = time.time()
2327
set_parent_raw_data_path(data_path)
2428

@@ -35,18 +39,23 @@ def run_algo(batch_path, uuid, data_path: str = None):
3539
params = item["params"]
3640
print("cnmfe params:", params)
3741

38-
# adapted from current demo notebook
39-
if "MESMERIZE_N_PROCESSES" in os.environ.keys():
40-
try:
41-
n_processes = int(os.environ["MESMERIZE_N_PROCESSES"])
42-
except:
43-
n_processes = psutil.cpu_count() - 1
42+
if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'):
43+
n_processes = dview._processes
44+
elif 'ipyparallel' in str(type(dview)):
45+
n_processes = len(dview)
4446
else:
45-
n_processes = psutil.cpu_count() - 1
46-
# Start cluster for parallel processing
47-
c, dview, n_processes = cm.cluster.setup_cluster(
48-
backend="local", n_processes=n_processes, single_thread=False
49-
)
47+
# adapted from current demo notebook
48+
if "MESMERIZE_N_PROCESSES" in os.environ.keys():
49+
try:
50+
n_processes = int(os.environ["MESMERIZE_N_PROCESSES"])
51+
except:
52+
n_processes = psutil.cpu_count() - 1
53+
else:
54+
n_processes = psutil.cpu_count() - 1
55+
# Start cluster for parallel processing
56+
c, dview, n_processes = cm.cluster.setup_cluster(
57+
backend="multiprocessing", n_processes=n_processes, single_thread=False
58+
)
5059

5160
try:
5261
fname_new = cm.save_memmap(

mesmerize_core/algorithms/mcorr.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import traceback
2+
import asyncio
23
import click
34
import caiman as cm
45
from caiman.source_extraction.cnmf.params import CNMFParams
@@ -18,7 +19,10 @@
1819
from ..batch_utils import set_parent_raw_data_path, load_batch
1920

2021

21-
def run_algo(batch_path, uuid, data_path: str = None):
22+
def run_algo(batch_path, uuid, data_path: str = None, dview=None):
23+
asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview))
24+
25+
async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None):
2226
algo_start = time.time()
2327
set_parent_raw_data_path(data_path)
2428

@@ -40,19 +44,25 @@ def run_algo(batch_path, uuid, data_path: str = None):
4044
params = item["params"]
4145

4246
# adapted from current demo notebook
43-
if "MESMERIZE_N_PROCESSES" in os.environ.keys():
44-
try:
45-
n_processes = int(os.environ["MESMERIZE_N_PROCESSES"])
46-
except:
47-
n_processes = psutil.cpu_count() - 1
47+
if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'):
48+
n_processes = dview._processes
49+
elif 'ipyparallel' in str(type(dview)):
50+
n_processes = len(dview)
4851
else:
49-
n_processes = psutil.cpu_count() - 1
52+
# adapted from current demo notebook
53+
if "MESMERIZE_N_PROCESSES" in os.environ.keys():
54+
try:
55+
n_processes = int(os.environ["MESMERIZE_N_PROCESSES"])
56+
except:
57+
n_processes = psutil.cpu_count() - 1
58+
else:
59+
n_processes = psutil.cpu_count() - 1
60+
# Start cluster for parallel processing
61+
c, dview, n_processes = cm.cluster.setup_cluster(
62+
backend="multiprocessing", n_processes=n_processes, single_thread=False
63+
)
5064

5165
print("starting mc")
52-
# Start cluster for parallel processing
53-
c, dview, n_processes = cm.cluster.setup_cluster(
54-
backend="local", n_processes=n_processes, single_thread=False
55-
)
5666

5767
rel_params = dict(params["main"])
5868
opts = CNMFParams(params_dict=rel_params)

mesmerize_core/batch_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
COMPUTE_BACKEND_SUBPROCESS = "subprocess" #: subprocess backend
1212
COMPUTE_BACKEND_SLURM = "slurm" #: SLURM backend
1313
COMPUTE_BACKEND_LOCAL = "local"
14+
COMPUTE_BACKEND_ASYNC = "local_async"
1415

1516
COMPUTE_BACKENDS = [
1617
COMPUTE_BACKEND_SUBPROCESS,
1718
COMPUTE_BACKEND_SLURM,
1819
COMPUTE_BACKEND_LOCAL,
20+
COMPUTE_BACKEND_ASYNC,
1921
]
2022

2123
DATAFRAME_COLUMNS = [

mesmerize_core/caiman_extensions/common.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from datetime import datetime
1010
import time
1111
from copy import deepcopy
12+
import asyncio
1213

1314
import numpy as np
1415
import pandas as pd
@@ -25,6 +26,7 @@
2526
COMPUTE_BACKENDS,
2627
COMPUTE_BACKEND_SUBPROCESS,
2728
COMPUTE_BACKEND_LOCAL,
29+
COMPUTE_BACKEND_ASYNC,
2830
get_parent_raw_data_path,
2931
load_batch,
3032
)
@@ -531,19 +533,30 @@ def __init__(self, s: pd.Series):
531533
self.process: Popen = None
532534

533535
def _run_local(
534-
self,
535-
algo: str,
536-
batch_path: Path,
537-
uuid: UUID,
538-
data_path: Union[Path, None],
539-
):
536+
self,
537+
algo: str,
538+
batch_path: Path,
539+
uuid: UUID,
540+
data_path: Union[Path, None],
541+
dview=None
542+
) -> DummyProcess:
543+
coroutine = self._run_local_async(algo, batch_path, uuid, data_path, dview)
544+
asyncio.run(coroutine)
545+
return DummyProcess()
546+
547+
def _run_local_async(
548+
self,
549+
algo: str,
550+
batch_path: Path,
551+
uuid: UUID,
552+
data_path: Union[Path, None],
553+
dview=None
554+
) -> Coroutine:
540555
algo_module = getattr(algorithms, algo)
541-
algo_module.run_algo(
542-
batch_path=str(batch_path), uuid=str(uuid), data_path=str(data_path)
556+
return algo_module.run_algo_async(
557+
batch_path=str(batch_path), uuid=str(uuid), data_path=str(data_path), dview=dview
543558
)
544559

545-
return DummyProcess()
546-
547560
def _run_subprocess(self, runfile_path: str, wait: bool, **kwargs):
548561

549562
# Get the dir that contains the input movie
@@ -636,13 +649,14 @@ def run(self, backend: Optional[str] = None, wait: bool = True, **kwargs):
636649

637650
batch_path = self._series.paths.get_batch_path()
638651

639-
if backend == COMPUTE_BACKEND_LOCAL:
640-
print(f"Running {self._series.uuid} with local backend")
641-
return self._run_local(
652+
if backend in [COMPUTE_BACKEND_LOCAL, COMPUTE_BACKEND_ASYNC]:
653+
print(f"Running {self._series.uuid} with {backend} backend")
654+
return getattr(self, f"_run_{backend}")(
642655
algo=self._series["algo"],
643656
batch_path=batch_path,
644657
uuid=self._series["uuid"],
645658
data_path=get_parent_raw_data_path(),
659+
dview=kwargs.get("dview")
646660
)
647661

648662
# Create the runfile in the batch dir using this Series' UUID as the filename

0 commit comments

Comments
 (0)