Skip to content

Commit 1847471

Browse files
committed
switch from using asyncio to concurrent.futures and add test
1 parent 85f5f53 commit 1847471

File tree

5 files changed

+88
-30
lines changed

5 files changed

+88
-30
lines changed

mesmerize_core/algorithms/cnmf.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
"""Performs CNMF in a separate process"""
22

3-
import asyncio
43
import click
54
import caiman as cm
65
from caiman.source_extraction.cnmf import cnmf as cnmf
76
from caiman.source_extraction.cnmf.params import CNMFParams
8-
import psutil
97
import numpy as np
108
import traceback
119
from pathlib import Path, PurePosixPath
1210
from shutil import move as move_file
13-
import os
1411
import time
1512

1613
# prevent circular import
@@ -25,9 +22,6 @@
2522

2623

2724
def run_algo(batch_path, uuid, data_path: str = None, dview=None):
28-
asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview))
29-
30-
async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None):
3125
algo_start = time.time()
3226
set_parent_raw_data_path(data_path)
3327

mesmerize_core/algorithms/cnmfe.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
import asyncio
21
import click
32
import numpy as np
43
import caiman as cm
54
from caiman.source_extraction.cnmf import cnmf as cnmf
65
from caiman.source_extraction.cnmf.params import CNMFParams
7-
import psutil
86
import traceback
97
from pathlib import Path, PurePosixPath
108
from shutil import move as move_file
11-
import os
129
import time
1310

1411
if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess
@@ -22,9 +19,6 @@
2219

2320

2421
def run_algo(batch_path, uuid, data_path: str = None, dview=None):
25-
asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview))
26-
27-
async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None):
2822
algo_start = time.time()
2923
set_parent_raw_data_path(data_path)
3024

mesmerize_core/algorithms/mcorr.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import traceback
2-
import asyncio
32
import click
43
import caiman as cm
54
from caiman.source_extraction.cnmf.params import CNMFParams
65
from caiman.motion_correction import MotionCorrect
76
from caiman.summary_images import local_correlations_movie_offline
8-
import psutil
97
import os
108
from pathlib import Path, PurePosixPath
119
import numpy as np
@@ -22,9 +20,6 @@
2220

2321

2422
def run_algo(batch_path, uuid, data_path: str = None, dview=None):
25-
asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview))
26-
27-
async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None):
2823
algo_start = time.time()
2924
set_parent_raw_data_path(data_path)
3025

mesmerize_core/caiman_extensions/common.py

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from datetime import datetime
1010
import time
1111
from copy import deepcopy
12-
import asyncio
12+
from concurrent.futures import ThreadPoolExecutor, Future
1313

1414
import numpy as np
1515
import pandas as pd
@@ -515,11 +515,26 @@ def get_parent(self, index: Union[int, str, UUID]) -> Union[UUID, None]:
515515
return r["uuid"]
516516

517517

518-
class DummyProcess:
518+
class Waitable(Protocol):
519+
"""An object that we can call "wait" on"""
520+
def wait(self) -> int: ...
521+
522+
523+
class DummyProcess(Waitable):
519524
"""Dummy process for local backend"""
520525

521-
def wait(self):
522-
pass
526+
def wait(self) -> int:
527+
return 0
528+
529+
530+
class WaitableFuture(Waitable):
531+
"""Adaptor for future returned from Executor.submit"""
532+
def __init__(self, future: Future[None]):
533+
self.future = future
534+
535+
def wait(self) -> int:
536+
self.future.result()
537+
return 0
523538

524539

525540
@pd.api.extensions.register_series_accessor("caiman")
@@ -530,7 +545,7 @@ class CaimanSeriesExtensions:
530545

531546
def __init__(self, s: pd.Series):
532547
self._series = s
533-
self.process: Popen = None
548+
self.process: Optional[Waitable] = None
534549

535550
def _run_local(
536551
self,
@@ -540,9 +555,15 @@ def _run_local(
540555
data_path: Union[Path, None],
541556
dview=None
542557
) -> DummyProcess:
543-
coroutine = self._run_local_async(algo, batch_path, uuid, data_path, dview)
544-
asyncio.run(coroutine)
545-
return DummyProcess()
558+
algo_module = getattr(algorithms, algo)
559+
algo_module.run_algo(
560+
batch_path=str(batch_path),
561+
uuid=str(uuid),
562+
data_path=str(data_path),
563+
dview=dview
564+
)
565+
self.process = DummyProcess()
566+
return self.process
546567

547568
def _run_local_async(
548569
self,
@@ -551,11 +572,18 @@ def _run_local_async(
551572
uuid: UUID,
552573
data_path: Union[Path, None],
553574
dview=None
554-
) -> Coroutine:
575+
) -> WaitableFuture:
555576
algo_module = getattr(algorithms, algo)
556-
return algo_module.run_algo_async(
557-
batch_path=str(batch_path), uuid=str(uuid), data_path=str(data_path), dview=dview
558-
)
577+
with ThreadPoolExecutor(max_workers=1) as executor:
578+
future = executor.submit(
579+
algo_module.run_algo,
580+
batch_path=str(batch_path),
581+
uuid=str(uuid),
582+
data_path=str(data_path),
583+
dview=dview
584+
)
585+
self.process = WaitableFuture(future)
586+
return self.process
559587

560588
def _run_subprocess(self, runfile_path: str, wait: bool, **kwargs):
561589

tests/test_core.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import os
2-
32
import numpy as np
43
from caiman.utils.utils import load_dict_from_hdf5
54
from caiman.source_extraction.cnmf.cnmf import CNMF
@@ -15,9 +14,12 @@
1514
from mesmerize_core.batch_utils import (
1615
DATAFRAME_COLUMNS,
1716
COMPUTE_BACKEND_SUBPROCESS,
17+
COMPUTE_BACKEND_LOCAL,
18+
COMPUTE_BACKEND_ASYNC,
1819
get_full_raw_data_path,
1920
)
2021
from mesmerize_core.utils import IS_WINDOWS
22+
from mesmerize_core.algorithms._utils import ensure_server
2123
from uuid import uuid4
2224
import pytest
2325
import requests
@@ -1336,3 +1338,48 @@ def test_cache():
13361338
assert hex(id(cnmf.cnmf_cache.get_cache().iloc[-1]["return_val"])) == hex(
13371339
id(output)
13381340
)
1341+
1342+
1343+
def test_backends():
1344+
"""test subprocess, local, and async_local backend"""
1345+
set_parent_raw_data_path(vid_dir)
1346+
algo = "mcorr"
1347+
df, batch_path = _create_tmp_batch()
1348+
input_movie_path = get_datafile(algo)
1349+
1350+
# make small version of movie for quick testing
1351+
movie = tifffile.imread(input_movie_path)
1352+
small_movie_path = input_movie_path.parent.joinpath("small_movie.tif")
1353+
tifffile.imwrite(small_movie_path, movie[:1001])
1354+
print(input_movie_path)
1355+
1356+
# put backends that can run in the background first to save time
1357+
backends = [COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_ASYNC, COMPUTE_BACKEND_LOCAL]
1358+
for backend in backends:
1359+
df.caiman.add_item(
1360+
algo="mcorr",
1361+
item_name=f"test-{backend}",
1362+
input_movie_path=small_movie_path,
1363+
params=test_params["mcorr"],
1364+
)
1365+
1366+
# run using each backend
1367+
procs = []
1368+
with ensure_server(None) as (dview, _):
1369+
for backend, (_, item) in zip(backends, df.iterrows()):
1370+
procs.append(item.caiman.run(backend=backend, dview=dview, wait=False))
1371+
1372+
# wait for all to finish
1373+
for proc in procs:
1374+
proc.wait()
1375+
1376+
# compare results
1377+
df = load_batch(batch_path)
1378+
for i, item in df.iterrows():
1379+
output = item.mcorr.get_output()
1380+
1381+
if i == 0:
1382+
# save to compare to other results
1383+
first_output = output
1384+
else:
1385+
numpy.testing.assert_array_equal(output, first_output)

0 commit comments

Comments
 (0)