From db184965d69d73429a0c7cbb8ccd39dfbf9d14a7 Mon Sep 17 00:00:00 2001 From: mrava87 Date: Wed, 29 Oct 2025 20:43:15 +0000 Subject: [PATCH 01/11] feat: added multithreading support to stacking ops --- pylops/basicoperators/block.py | 20 +++++++-- pylops/basicoperators/blockdiag.py | 71 +++++++++++++++++++++++++++--- pylops/basicoperators/hstack.py | 65 ++++++++++++++++++++++++--- pylops/basicoperators/vstack.py | 64 ++++++++++++++++++++++++--- 4 files changed, 196 insertions(+), 24 deletions(-) diff --git a/pylops/basicoperators/block.py b/pylops/basicoperators/block.py index cc75a6e7..c56b0058 100644 --- a/pylops/basicoperators/block.py +++ b/pylops/basicoperators/block.py @@ -1,5 +1,6 @@ __all__ = ["Block"] +import concurrent.futures as mt import multiprocessing as mp from typing import Iterable, Optional @@ -65,12 +66,16 @@ class Block(_Block): Alternatively, :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel using - ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel using + ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 Force an array to be flattened after rmatvec. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -148,10 +153,17 @@ def __init__( ops: Iterable[Iterable[LinearOperator]], nproc: int = 1, forceflat: bool = None, + multiproc: bool = True, dtype: Optional[DTypeLike] = None, ): if nproc > 1: - self.pool = mp.Pool(processes=nproc) + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) super().__init__( - ops=ops, forceflat=forceflat, dtype=dtype, _args_VStack={"nproc": nproc} + ops=ops, + forceflat=forceflat, + dtype=dtype, + _args_VStack={"nproc": nproc, "multiproc": multiproc}, ) diff --git a/pylops/basicoperators/blockdiag.py b/pylops/basicoperators/blockdiag.py index a049a4e8..39523306 100644 --- a/pylops/basicoperators/blockdiag.py +++ b/pylops/basicoperators/blockdiag.py @@ -1,5 +1,6 @@ __all__ = ["BlockDiag"] +import concurrent.futures as mt import multiprocessing as mp import numpy as np @@ -42,8 +43,8 @@ class BlockDiag(LinearOperator): :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel using - ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel using + ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 @@ -54,6 +55,10 @@ class BlockDiag(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -137,6 +142,7 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, + multiproc: bool = True, dtype: Optional[DTypeLike] = None, ) -> None: self.ops = ops @@ -167,12 +173,15 @@ def __init__( else: dimsd = (self.nops,) forceflat = True - # create pool for multiprocessing + # create pool for multithreading / multiprocessing + self.multiproc = multiproc self._nproc = nproc self.pool: Optional[mp.pool.Pool] = None if self.nproc > 1: - self.pool = mp.Pool(processes=nproc) - + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) self.inoutengine = inoutengine dtype = _get_dtype(ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -252,16 +261,64 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y + def _matvec_multithread(self, x: NDArray) -> NDArray: + if self.pool is None: + raise ValueError + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._matvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + y = np.hstack(ys) + return y + + def _rmatvec_multithread(self, x: NDArray) -> NDArray: + if self.pool is None: + raise ValueError + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._rmatvec, x[self.mmops[iop] : self.mmops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + y = np.hstack(ys) + return y + def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - y = self._matvec_multiproc(x) + if self.multiproc: + y = self._matvec_multiproc(x) + else: + y = self._matvec_multithread(x) return y def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - y = self._rmatvec_multiproc(x) + if self.multiproc: + y = self._rmatvec_multiproc(x) + else: + y = self._rmatvec_multithread(x) return y + + def close(self): + """Close the pool of workers used for multiprocessing + / multithreading. + """ + if self.pool is not None: + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() + self.pool = None diff --git a/pylops/basicoperators/hstack.py b/pylops/basicoperators/hstack.py index 79b12ad4..fb62cf64 100644 --- a/pylops/basicoperators/hstack.py +++ b/pylops/basicoperators/hstack.py @@ -1,5 +1,6 @@ __all__ = ["HStack"] +import concurrent.futures as mt import multiprocessing as mp import numpy as np @@ -47,8 +48,8 @@ class HStack(LinearOperator): :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel - using ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel + using ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 @@ -59,6 +60,10 @@ class HStack(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -139,6 +144,7 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, + multiproc: bool = True, dtype: Optional[str] = None, ) -> None: self.ops = ops @@ -161,12 +167,15 @@ def __init__( else: dimsd = (self.nops,) forceflat = True - # create pool for multiprocessing + # create pool for multithreading / multiprocessing + self.multiproc = multiproc self._nproc = nproc self.pool = None if self.nproc > 1: - self.pool = mp.Pool(processes=nproc) - + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) self.inoutengine = inoutengine dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -241,6 +250,30 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y + def _matvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + + y = np.sum(ys, axis=0) + return y + + def _rmatvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [(oper._matvec, x) for iop, oper in enumerate(self.ops)], + ) + ) + y = np.hstack(ys) + return y + def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) @@ -250,7 +283,25 @@ def _matvec(self, x: NDArray) -> NDArray: def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: - y = self._rmatvec_serial(x) + if self.multiproc: + y = self._matvec_multiproc(x) + else: + y = self._matvec_multithread(x) else: - y = self._rmatvec_multiproc(x) + if self.multiproc: + y = self._rmatvec_multiproc(x) + else: + y = self._rmatvec_multithread(x) return y + + def close(self): + """Close the pool of workers used for multiprocessing / + multithreading. + """ + if self.pool is not None: + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() + self.pool = None diff --git a/pylops/basicoperators/vstack.py b/pylops/basicoperators/vstack.py index 3f4d50f0..d98ab65c 100644 --- a/pylops/basicoperators/vstack.py +++ b/pylops/basicoperators/vstack.py @@ -1,5 +1,6 @@ __all__ = ["VStack"] +import concurrent.futures as mt import multiprocessing as mp import numpy as np @@ -46,8 +47,8 @@ class VStack(LinearOperator): :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel using - ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel using + ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 @@ -58,6 +59,10 @@ class VStack(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -138,6 +143,7 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, + multiproc: bool = True, dtype: Optional[DTypeLike] = None, ) -> None: self.ops = ops @@ -160,11 +166,15 @@ def __init__( else: dims = (self.mops,) forceflat = True - # create pool for multiprocessing + # create pool for multithreading / multiprocessing + self.multiproc = multiproc self._nproc = nproc self.pool = None if self.nproc > 1: - self.pool = mp.Pool(processes=nproc) + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) self.inoutengine = inoutengine dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -239,16 +249,58 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.sum(ys, axis=0) return y + def _matvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [(oper._matvec, x) for iop, oper in enumerate(self.ops)], + ) + ) + y = np.hstack(ys) + return y + + def _rmatvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + + y = np.sum(ys, axis=0) + return y + def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - y = self._matvec_multiproc(x) + if self.multiproc: + y = self._matvec_multiproc(x) + else: + y = self._matvec_multithread(x) return y def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - y = self._rmatvec_multiproc(x) + if self.multiproc: + y = self._rmatvec_multiproc(x) + else: + y = self._rmatvec_multithread(x) return y + + def close(self): + """Close the pool of workers used for multiprocessing / + multithreading. + """ + if self.pool is not None: + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() + self.pool = None From d358328555d96d7d2eaa5e1d256cc5c978289b03 Mon Sep 17 00:00:00 2001 From: mrava87 Date: Thu, 30 Oct 2025 22:34:26 +0000 Subject: [PATCH 02/11] fix: correct error in _rmatvec of hstack --- pylops/basicoperators/hstack.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pylops/basicoperators/hstack.py b/pylops/basicoperators/hstack.py index fb62cf64..d87f6854 100644 --- a/pylops/basicoperators/hstack.py +++ b/pylops/basicoperators/hstack.py @@ -278,15 +278,15 @@ def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - y = self._matvec_multiproc(x) - return y - - def _rmatvec(self, x: NDArray) -> NDArray: - if self.nproc == 1: if self.multiproc: y = self._matvec_multiproc(x) else: y = self._matvec_multithread(x) + return y + + def _rmatvec(self, x: NDArray) -> NDArray: + if self.nproc == 1: + y = self._rmatvec_serial(x) else: if self.multiproc: y = self._rmatvec_multiproc(x) From b843841ecd9713b10caecc1ea488d732456f593f Mon Sep 17 00:00:00 2001 From: mrava87 Date: Sun, 2 Nov 2025 10:31:21 +0000 Subject: [PATCH 03/11] feat: added nttot to BlendingContinuous to allow using it in HStack operator --- pylops/waveeqprocessing/blending.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pylops/waveeqprocessing/blending.py b/pylops/waveeqprocessing/blending.py index cc1b54af..f6c4928d 100644 --- a/pylops/waveeqprocessing/blending.py +++ b/pylops/waveeqprocessing/blending.py @@ -4,6 +4,8 @@ "BlendingHalf", ] +from typing import Optional + import numpy as np from pylops import LinearOperator @@ -36,6 +38,9 @@ class BlendingContinuous(LinearOperator): shiftall : :obj:`bool`, optional Shift all shots together (``True``) or one at the time (``False``). Defaults to ``shiftall=False`` (original implementation), however ``shiftall=True`` should be preferred when ``nr`` is small. + nttot : :obj:`int` + Total number of time samples in blended data (if ``None``, computed internally + based on the the maximum ignition time in ``times``) dtype : :obj:`str`, optional Operator dtype name : :obj:`str`, optional @@ -43,9 +48,6 @@ class BlendingContinuous(LinearOperator): Attributes ---------- - ntot : :obj:`int` - Total number of time samples in blended data (based on the - the maximum ignition time in ``times``) PadOp : :obj:`pylops.basicoperators.Pad` Padding operator used to add one zero at the end of each shot gather to avoid boundary effects when shifting @@ -89,6 +91,7 @@ def __init__( dt: float, times: NDArray, shiftall: bool = False, + nttot: Optional[int] = None, dtype: DTypeLike = "float64", name: str = "B", ) -> None: @@ -99,7 +102,7 @@ def __init__( self.dt = dt self.times = times self.shiftall = shiftall - self.nttot = int(np.max(self.times) / self.dt + self.nt + 1) + self.nttot = nttot if nttot is not None else g if not self.shiftall: # original implementation, where each source is shifted indipendently self.PadOp = Pad((self.nr, self.nt), ((0, 0), (0, 1)), dtype=self.dtype) From 79db7bce381d64da50266d3e0f64fb070498293b Mon Sep 17 00:00:00 2001 From: mrava87 Date: Sun, 2 Nov 2025 10:35:41 +0000 Subject: [PATCH 04/11] minor: fix error in self.nttot creation --- pylops/waveeqprocessing/blending.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pylops/waveeqprocessing/blending.py b/pylops/waveeqprocessing/blending.py index f6c4928d..defce44d 100644 --- a/pylops/waveeqprocessing/blending.py +++ b/pylops/waveeqprocessing/blending.py @@ -102,7 +102,11 @@ def __init__( self.dt = dt self.times = times self.shiftall = shiftall - self.nttot = nttot if nttot is not None else g + self.nttot = ( + nttot + if nttot is not None + else int(np.max(self.times) / self.dt + self.nt + 1) + ) if not self.shiftall: # original implementation, where each source is shifted indipendently self.PadOp = Pad((self.nr, self.nt), ((0, 0), (0, 1)), dtype=self.dtype) From 6ebf4e919391f19d0008d6ad4b8f7f8c9b486451 Mon Sep 17 00:00:00 2001 From: mrava87 Date: Fri, 7 Nov 2025 21:56:02 +0000 Subject: [PATCH 05/11] bug: fix problem in BlockDiag multithread --- pylops/basicoperators/blockdiag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pylops/basicoperators/blockdiag.py b/pylops/basicoperators/blockdiag.py index 39523306..544f453b 100644 --- a/pylops/basicoperators/blockdiag.py +++ b/pylops/basicoperators/blockdiag.py @@ -268,7 +268,7 @@ def _matvec_multithread(self, x: NDArray) -> NDArray: self.pool.map( lambda args: _matvec_rmatvec_map(*args), [ - (oper._matvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + (oper._matvec, x[self.mmops[iop] : self.mmops[iop + 1]]) for iop, oper in enumerate(self.ops) ], ) @@ -283,7 +283,7 @@ def _rmatvec_multithread(self, x: NDArray) -> NDArray: self.pool.map( lambda args: _matvec_rmatvec_map(*args), [ - (oper._rmatvec, x[self.mmops[iop] : self.mmops[iop + 1]]) + (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) for iop, oper in enumerate(self.ops) ], ) From 94e945ce602a89dac28712b36ab6883a9c9c35bd Mon Sep 17 00:00:00 2001 From: mrava87 Date: Fri, 7 Nov 2025 21:56:55 +0000 Subject: [PATCH 06/11] feat: improved H/VStack reductions from threads --- pylops/basicoperators/hstack.py | 51 +++++++++++++++++++++++++++------ pylops/basicoperators/vstack.py | 51 +++++++++++++++++++++++++++------ 2 files changed, 84 insertions(+), 18 deletions(-) diff --git a/pylops/basicoperators/hstack.py b/pylops/basicoperators/hstack.py index d87f6854..cefc1373 100644 --- a/pylops/basicoperators/hstack.py +++ b/pylops/basicoperators/hstack.py @@ -2,6 +2,7 @@ import concurrent.futures as mt import multiprocessing as mp +import threading import numpy as np import scipy as sp @@ -18,7 +19,7 @@ LinearOperator as spLinearOperator, ) -from typing import Optional, Sequence +from typing import Callable, Optional, Sequence from pylops import LinearOperator from pylops.basicoperators import MatrixMult, Zero @@ -31,6 +32,13 @@ def _matvec_rmatvec_map(op, x: NDArray) -> NDArray: return op(x).squeeze() +def _matvec_map_mt(op: Callable, x: NDArray, y: NDArray, lock: threading.Lock) -> None: + """rmatvec for multithreading with lock""" + ylocal = op(x).squeeze() + with lock: + y[:] += ylocal + + class HStack(LinearOperator): r"""Horizontal stacking. @@ -176,6 +184,7 @@ def __init__( self.pool = mp.Pool(processes=nproc) else: self.pool = mt.ThreadPoolExecutor(max_workers=nproc) + self.lock = threading.Lock() self.inoutengine = inoutengine dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -194,9 +203,16 @@ def nproc(self) -> int: @nproc.setter def nproc(self, nprocnew: int): if self._nproc > 1: - self.pool.close() + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() if nprocnew > 1: - self.pool = mp.Pool(processes=nprocnew) + if self.multiproc: + self.pool = mp.Pool(processes=nprocnew) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nprocnew) self._nproc = nprocnew def _matvec_serial(self, x: NDArray) -> NDArray: @@ -250,25 +266,42 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y + # def _matvec_multithread(self, x: NDArray) -> NDArray: + # ys = list( + # self.pool.map( + # lambda args: _matvec_rmatvec_map(*args), + # [ + # (oper._matvec, x[self.mmops[iop] : self.mmops[iop + 1]]) + # for iop, oper in enumerate(self.ops) + # ], + # ) + # ) + # y = np.sum(ys, axis=0) + # return y + def _matvec_multithread(self, x: NDArray) -> NDArray: - ys = list( + y = np.zeros(self.nops, dtype=self.dtype) + list( self.pool.map( - lambda args: _matvec_rmatvec_map(*args), + lambda args: _matvec_map_mt(*args), [ - (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + ( + oper._matvec, + x[self.mmops[iop] : self.mmops[iop + 1]], + y, + self.lock, + ) for iop, oper in enumerate(self.ops) ], ) ) - - y = np.sum(ys, axis=0) return y def _rmatvec_multithread(self, x: NDArray) -> NDArray: ys = list( self.pool.map( lambda args: _matvec_rmatvec_map(*args), - [(oper._matvec, x) for iop, oper in enumerate(self.ops)], + [(oper._rmatvec, x) for iop, oper in enumerate(self.ops)], ) ) y = np.hstack(ys) diff --git a/pylops/basicoperators/vstack.py b/pylops/basicoperators/vstack.py index d98ab65c..d7b0e583 100644 --- a/pylops/basicoperators/vstack.py +++ b/pylops/basicoperators/vstack.py @@ -2,6 +2,7 @@ import concurrent.futures as mt import multiprocessing as mp +import threading import numpy as np import scipy as sp @@ -27,10 +28,17 @@ def _matvec_rmatvec_map(op: Callable, x: NDArray) -> NDArray: - """matvec/rmatvec for multiprocessing""" + """matvec/rmatvec for multiprocessing / multithreading""" return op(x).squeeze() +def _rmatvec_map_mt(op: Callable, x: NDArray, y: NDArray, lock: threading.Lock) -> None: + """rmatvec for multithreading with lock""" + ylocal = op(x).squeeze() + with lock: + y[:] += ylocal + + class VStack(LinearOperator): r"""Vertical stacking. @@ -171,10 +179,11 @@ def __init__( self._nproc = nproc self.pool = None if self.nproc > 1: - if multiproc: + if self.multiproc: self.pool = mp.Pool(processes=nproc) else: self.pool = mt.ThreadPoolExecutor(max_workers=nproc) + self.lock = threading.Lock() self.inoutengine = inoutengine dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -193,9 +202,16 @@ def nproc(self) -> int: @nproc.setter def nproc(self, nprocnew: int): if self._nproc > 1: - self.pool.close() + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() if nprocnew > 1: - self.pool = mp.Pool(processes=nprocnew) + if self.multiproc: + self.pool = mp.Pool(processes=nprocnew) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nprocnew) self._nproc = nprocnew def _matvec_serial(self, x: NDArray) -> NDArray: @@ -259,18 +275,35 @@ def _matvec_multithread(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y + # def _rmatvec_multithread(self, x: NDArray) -> NDArray: + # ys = list( + # self.pool.map( + # lambda args: _matvec_rmatvec_map(*args), + # [ + # (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + # for iop, oper in enumerate(self.ops) + # ], + # ) + # ) + # y = np.sum(ys, axis=0) + # return y + def _rmatvec_multithread(self, x: NDArray) -> NDArray: - ys = list( + y = np.zeros(self.mops, dtype=self.dtype) + list( self.pool.map( - lambda args: _matvec_rmatvec_map(*args), + lambda args: _rmatvec_map_mt(*args), [ - (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + ( + oper._rmatvec, + x[self.nnops[iop] : self.nnops[iop + 1]], + y, + self.lock, + ) for iop, oper in enumerate(self.ops) ], ) ) - - y = np.sum(ys, axis=0) return y def _matvec(self, x: NDArray) -> NDArray: From e9412866bf71e70b371cdd0c64b9c6638cec0919 Mon Sep 17 00:00:00 2001 From: mrava87 Date: Fri, 7 Nov 2025 21:57:11 +0000 Subject: [PATCH 07/11] test: added multithreading to tests --- pytests/test_combine.py | 213 +++++++++++++++++++++------------------- 1 file changed, 113 insertions(+), 100 deletions(-) diff --git a/pytests/test_combine.py b/pytests/test_combine.py index d8092714..f899e8e6 100644 --- a/pytests/test_combine.py +++ b/pytests/test_combine.py @@ -351,128 +351,141 @@ def test_BlockDiag(par): int(os.environ.get("TEST_CUPY_PYLOPS", 0)) == 1, reason="Not CuPy enabled" ) @pytest.mark.parametrize("par", [(par1), (par2), (par1j), (par2j)]) -def test_VStack_multiproc(par): - """Single and multiprocess consistentcy for VStack operator""" - np.random.seed(0) - nproc = 2 - G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) - x = np.ones(par["nx"]) + par["imag"] * np.ones(par["nx"]) - y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) - - Vop = VStack([MatrixMult(G, dtype=par["dtype"])] * 4, dtype=par["dtype"]) - Vmultiop = VStack( - [MatrixMult(G, dtype=par["dtype"])] * 4, nproc=nproc, dtype=par["dtype"] - ) - assert dottest( - Vmultiop, - 4 * par["ny"], - par["nx"], - complexflag=0 if par["imag"] == 0 else 3, - backend=backend, - ) - # forward - assert_array_almost_equal(Vop * x, Vmultiop * x, decimal=4) - # adjoint - assert_array_almost_equal(Vop.H * y, Vmultiop.H * y, decimal=4) +def test_VStack_multiproc_multithread(par): + """Single and multiprocess/multithreading consistentcy for VStack operator""" + for multiproc in [True, False]: + np.random.seed(0) + nproc = 2 + G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) + x = np.ones(par["nx"]) + par["imag"] * np.ones(par["nx"]) + y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) + + Vop = VStack([MatrixMult(G, dtype=par["dtype"])] * 4, dtype=par["dtype"]) + Vmultiop = VStack( + [MatrixMult(G, dtype=par["dtype"])] * 4, + nproc=nproc, + multiproc=multiproc, + dtype=par["dtype"], + ) + assert dottest( + Vmultiop, + 4 * par["ny"], + par["nx"], + complexflag=0 if par["imag"] == 0 else 3, + backend=backend, + ) + # forward + assert_array_almost_equal(Vop * x, Vmultiop * x, decimal=4) + # adjoint + assert_array_almost_equal(Vop.H * y, Vmultiop.H * y, decimal=4) - # close pool - Vmultiop.pool.close() + # close pool + Vmultiop.close() @pytest.mark.skipif( int(os.environ.get("TEST_CUPY_PYLOPS", 0)) == 1, reason="Not CuPy enabled" ) @pytest.mark.parametrize("par", [(par2), (par2j)]) -def test_HStack_multiproc(par): - """Single and multiprocess consistentcy for HStack operator""" - np.random.seed(0) - nproc = 2 - G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) - x = np.ones(4 * par["nx"]) + par["imag"] * np.ones(4 * par["nx"]) - y = np.ones(par["ny"]) + par["imag"] * np.ones(par["ny"]) - - Hop = HStack([MatrixMult(G, dtype=par["dtype"])] * 4, dtype=par["dtype"]) - Hmultiop = HStack( - [MatrixMult(G, dtype=par["dtype"])] * 4, nproc=nproc, dtype=par["dtype"] - ) - assert dottest( - Hmultiop, - par["ny"], - 4 * par["nx"], - complexflag=0 if par["imag"] == 0 else 3, - backend=backend, - ) - # forward - assert_array_almost_equal(Hop * x, Hmultiop * x, decimal=4) - # adjoint - assert_array_almost_equal(Hop.H * y, Hmultiop.H * y, decimal=4) +def test_HStack_multiproc_multithread(par): + """Single and multiprocess/multithreading consistentcy for HStack operator""" + for multiproc in [True, False]: + np.random.seed(0) + nproc = 2 + G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) + x = np.ones(4 * par["nx"]) + par["imag"] * np.ones(4 * par["nx"]) + y = np.ones(par["ny"]) + par["imag"] * np.ones(par["ny"]) + + Hop = HStack([MatrixMult(G, dtype=par["dtype"])] * 4, dtype=par["dtype"]) + Hmultiop = HStack( + [MatrixMult(G, dtype=par["dtype"])] * 4, + nproc=nproc, + multiproc=multiproc, + dtype=par["dtype"], + ) + assert dottest( + Hmultiop, + par["ny"], + 4 * par["nx"], + complexflag=0 if par["imag"] == 0 else 3, + backend=backend, + ) + # forward + assert_array_almost_equal(Hop * x, Hmultiop * x, decimal=4) + # adjoint + assert_array_almost_equal(Hop.H * y, Hmultiop.H * y, decimal=4) - # close pool - Hmultiop.pool.close() + # close pool + Hmultiop.close() @pytest.mark.skipif( int(os.environ.get("TEST_CUPY_PYLOPS", 0)) == 1, reason="Not CuPy enabled" ) @pytest.mark.parametrize("par", [(par1), (par2), (par1j), (par2j)]) -def test_Block_multiproc(par): - """Single and multiprocess consistentcy for Block operator""" - np.random.seed(0) - nproc = 2 - G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) - Gvert = [MatrixMult(G, dtype=par["dtype"])] * 2 - Ghor = [Gvert] * 4 - x = np.ones(2 * par["nx"]) + par["imag"] * np.ones(2 * par["nx"]) - y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) - - Bop = Block(Ghor, dtype=par["dtype"]) - Bmultiop = Block(Ghor, nproc=nproc, dtype=par["dtype"]) - assert dottest( - Bmultiop, - 4 * par["ny"], - 2 * par["nx"], - complexflag=0 if par["imag"] == 0 else 3, - backend=backend, - ) - # forward - assert_array_almost_equal(Bop * x, Bmultiop * x, decimal=3) - # adjoint - assert_array_almost_equal(Bop.H * y, Bmultiop.H * y, decimal=3) +def test_Block_multiproc_multithread(par): + """Single and multiprocess/multithreading consistentcy for Block operator""" + for multiproc in [True, False]: + np.random.seed(0) + nproc = 2 + G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) + Gvert = [MatrixMult(G, dtype=par["dtype"])] * 2 + Ghor = [Gvert] * 4 + x = np.ones(2 * par["nx"]) + par["imag"] * np.ones(2 * par["nx"]) + y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) + + Bop = Block(Ghor, dtype=par["dtype"]) + Bmultiop = Block(Ghor, nproc=nproc, multiproc=multiproc, dtype=par["dtype"]) + assert dottest( + Bmultiop, + 4 * par["ny"], + 2 * par["nx"], + complexflag=0 if par["imag"] == 0 else 3, + backend=backend, + ) + # forward + assert_array_almost_equal(Bop * x, Bmultiop * x, decimal=3) + # adjoint + assert_array_almost_equal(Bop.H * y, Bmultiop.H * y, decimal=3) - # close pool - Bmultiop.pool.close() + # close pool + Bmultiop.Op.close() @pytest.mark.skipif( int(os.environ.get("TEST_CUPY_PYLOPS", 0)) == 1, reason="Not CuPy enabled" ) @pytest.mark.parametrize("par", [(par1), (par2), (par1j), (par2j)]) -def test_BlockDiag_multiproc(par): - """Single and multiprocess consistentcy for BlockDiag operator""" - np.random.seed(0) - nproc = 2 - G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) - x = np.ones(4 * par["nx"]) + par["imag"] * np.ones(4 * par["nx"]) - y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) - - BDop = BlockDiag([MatrixMult(G, dtype=par["dtype"])] * 4, dtype=par["dtype"]) - BDmultiop = BlockDiag( - [MatrixMult(G, dtype=par["dtype"])] * 4, nproc=nproc, dtype=par["dtype"] - ) - assert dottest( - BDmultiop, - 4 * par["ny"], - 4 * par["nx"], - complexflag=0 if par["imag"] == 0 else 3, - backend=backend, - ) - # forward - assert_array_almost_equal(BDop * x, BDmultiop * x, decimal=4) - # adjoint - assert_array_almost_equal(BDop.H * y, BDmultiop.H * y, decimal=4) +def test_BlockDiag_multiproc_multithread(par): + """Single and multiprocess/multithreading consistentcy for BlockDiag operator""" + for multiproc in [True, False]: + np.random.seed(0) + nproc = 2 + G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) + x = np.ones(4 * par["nx"]) + par["imag"] * np.ones(4 * par["nx"]) + y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) + + BDop = BlockDiag([MatrixMult(G, dtype=par["dtype"])] * 4, dtype=par["dtype"]) + BDmultiop = BlockDiag( + [MatrixMult(G, dtype=par["dtype"])] * 4, + nproc=nproc, + multiproc=multiproc, + dtype=par["dtype"], + ) + assert dottest( + BDmultiop, + 4 * par["ny"], + 4 * par["nx"], + complexflag=0 if par["imag"] == 0 else 3, + backend=backend, + ) + # forward + assert_array_almost_equal(BDop * x, BDmultiop * x, decimal=4) + # adjoint + assert_array_almost_equal(BDop.H * y, BDmultiop.H * y, decimal=4) - # close pool - BDmultiop.pool.close() + # close pool + BDmultiop.close() @pytest.mark.parametrize("par", [(par1j), (par2j)]) From 49132643193922062a3853a5440a50a111817c05 Mon Sep 17 00:00:00 2001 From: mrava87 Date: Sun, 9 Nov 2025 21:55:09 +0000 Subject: [PATCH 08/11] minor: fix indentation in docstring --- pylops/signalprocessing/chirpradon3d.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pylops/signalprocessing/chirpradon3d.py b/pylops/signalprocessing/chirpradon3d.py index cad96512..d64a4b7a 100755 --- a/pylops/signalprocessing/chirpradon3d.py +++ b/pylops/signalprocessing/chirpradon3d.py @@ -49,8 +49,8 @@ class ChirpRadon3D(LinearOperator): Name of operator (to be used by :func:`pylops.utils.describe.describe`) **kwargs_fftw - Arbitrary keyword arguments for :py:class:`pyfftw.FTTW` - (reccomended: ``flags=('FFTW_ESTIMATE', ), threads=NTHREADS``) + Arbitrary keyword arguments for :py:class:`pyfftw.FTTW` + (reccomended: ``flags=('FFTW_ESTIMATE', ), threads=NTHREADS``) Attributes ---------- From 2e289f5fd9285b2bbc218a3b72ae1105e78dfe2f Mon Sep 17 00:00:00 2001 From: mrava87 Date: Sun, 9 Nov 2025 21:57:22 +0000 Subject: [PATCH 09/11] feat: added kwargs_fft to blending ops --- pylops/waveeqprocessing/blending.py | 33 +++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/pylops/waveeqprocessing/blending.py b/pylops/waveeqprocessing/blending.py index defce44d..ad96b732 100644 --- a/pylops/waveeqprocessing/blending.py +++ b/pylops/waveeqprocessing/blending.py @@ -45,6 +45,10 @@ class BlendingContinuous(LinearOperator): Operator dtype name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) + **kwargs_fft + .. versionadded:: 2.6.0 + + Arbitrary keyword arguments to be passed to the Shift operator Attributes ---------- @@ -94,6 +98,7 @@ def __init__( nttot: Optional[int] = None, dtype: DTypeLike = "float64", name: str = "B", + **kwargs_fft, ) -> None: self.dtype = np.dtype(dtype) self.nt = nt @@ -131,6 +136,7 @@ def __init__( sampling=self.dt, real=True, dtype=self.dtype, + **kwargs_fft, ) ) else: @@ -149,6 +155,7 @@ def __init__( sampling=self.dt, real=True, dtype=self.dtype, + **kwargs_fft, ) self.diff = diff @@ -250,6 +257,7 @@ def BlendingGroup( nproc: int = 1, dtype: DTypeLike = "float64", name: str = "B", + **kwargs_fft, ) -> LinearOperator: r"""Group blending operator @@ -284,6 +292,10 @@ def BlendingGroup( Operator dtype name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) + **kwargs_fft + .. versionadded:: 2.6.0 + + Arbitrary keyword arguments to be passed to the Shift operator Returns ------- @@ -320,7 +332,13 @@ def BlendingGroup( Hop = [] for j in range(group_size): ShiftOp = Shift( - (nr, nt), times[j, i], axis=1, sampling=dt, real=False, dtype=dtype + (nr, nt), + times[j, i], + axis=1, + sampling=dt, + real=False, + dtype=dtype, + **kwargs_fft, ) Hop.append(ShiftOp) Bop.append(HStack(Hop)) @@ -342,6 +360,7 @@ def BlendingHalf( nproc: int = 1, dtype: DTypeLike = "float64", name: str = "B", + **kwargs_fft, ) -> LinearOperator: r"""Half blending operator @@ -376,6 +395,10 @@ def BlendingHalf( Operator dtype name : :obj:`str`, optional Name of operator (to be used by :func:`pylops.utils.describe.describe`) + **kwargs_fft + .. versionadded:: 2.6.0 + + Arbitrary keyword arguments to be passed to the Shift operator Returns ------- @@ -412,7 +435,13 @@ def BlendingHalf( OpShift = [] for i in range(n_groups): ShiftOp = Shift( - (nr, nt), times[j, i], axis=1, sampling=dt, real=False, dtype=dtype + (nr, nt), + times[j, i], + axis=1, + sampling=dt, + real=False, + dtype=dtype, + **kwargs_fft, ) OpShift.append(ShiftOp) Dop = BlockDiag(OpShift, nproc=nproc) From 79c2087309487c56517951b64fa0b860bf0fc3ea Mon Sep 17 00:00:00 2001 From: mrava87 Date: Mon, 10 Nov 2025 21:50:01 +0000 Subject: [PATCH 10/11] feat: switched to parallel_kind in stacking ops --- pylops/basicoperators/block.py | 22 +++++++------- pylops/basicoperators/blockdiag.py | 48 ++++++++++++++++-------------- pylops/basicoperators/hstack.py | 48 +++++++++++++----------------- pylops/basicoperators/vstack.py | 48 +++++++++++++----------------- pytests/test_combine.py | 18 ++++++----- 5 files changed, 86 insertions(+), 98 deletions(-) diff --git a/pylops/basicoperators/block.py b/pylops/basicoperators/block.py index c56b0058..97aa6583 100644 --- a/pylops/basicoperators/block.py +++ b/pylops/basicoperators/block.py @@ -72,18 +72,21 @@ class Block(_Block): .. versionadded:: 2.2.0 Force an array to be flattened after rmatvec. - multiproc : :obj:`bool`, optional + parallel_kind : :obj:`str`, optional .. versionadded:: 2.6.0 - Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. + Parallelism kind when ``nproc>1``. Can be ``multiproc`` (using + :mod:`multiprocessing`) or ``multithread`` (using + :class:`concurrent.futures.ThreadPoolExecutor`). Defaults + to ``multiproc``. dtype : :obj:`str`, optional Type of elements in input array. Attributes ---------- - pool : :obj:`multiprocessing.Pool` or :obj:`None` - Pool of workers used to evaluate the N operators in parallel using - ``multiprocessing``. When ``nproc=1``, no pool is created (i.e., + pool : :obj:`multiprocessing.Pool` or :obj:`concurrent.futures.ThreadPoolExecutor` or :obj:`None` + Pool of workers used to evaluate the N operators in parallel. + When ``nproc=1``, no pool is created (i.e., ``pool=None``). shape : :obj:`tuple` Operator shape. @@ -153,17 +156,12 @@ def __init__( ops: Iterable[Iterable[LinearOperator]], nproc: int = 1, forceflat: bool = None, - multiproc: bool = True, + parallel_kind: str = "multiproc", dtype: Optional[DTypeLike] = None, ): - if nproc > 1: - if multiproc: - self.pool = mp.Pool(processes=nproc) - else: - self.pool = mt.ThreadPoolExecutor(max_workers=nproc) super().__init__( ops=ops, forceflat=forceflat, dtype=dtype, - _args_VStack={"nproc": nproc, "multiproc": multiproc}, + _args_VStack={"nproc": nproc, "parallel_kind": parallel_kind}, ) diff --git a/pylops/basicoperators/blockdiag.py b/pylops/basicoperators/blockdiag.py index 544f453b..f6833864 100644 --- a/pylops/basicoperators/blockdiag.py +++ b/pylops/basicoperators/blockdiag.py @@ -55,10 +55,13 @@ class BlockDiag(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. - multiproc : :obj:`bool`, optional + parallel_kind : :obj:`str`, optional .. versionadded:: 2.6.0 - Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. + Parallelism kind when ``nproc>1``. Can be ``multiproc`` (using + :mod:`multiprocessing`) or ``multithread`` (using + :class:`concurrent.futures.ThreadPoolExecutor`). Defaults + to ``multiproc``. dtype : :obj:`str`, optional Type of elements in input array. @@ -80,10 +83,9 @@ class BlockDiag(LinearOperator): Shape of the array after the forward, but before flattening. For example, ``y_reshaped = (Op * x.ravel()).reshape(Op.dimsd)``. - pool : :obj:`multiprocessing.Pool` or :obj:`None` - Pool of workers used to evaluate the N operators in parallel using - ``multiprocessing``. When ``nproc=1``, no pool is created (i.e., - ``pool=None``). + pool : :obj:`multiprocessing.Pool` or :obj:`concurrent.futures.ThreadPoolExecutor` or :obj:`None` + Pool of workers used to evaluate the N operators in parallel. + When ``nproc=1``, no pool is created (i.e., ``pool=None``). shape : :obj:`tuple` Operator shape. @@ -142,9 +144,12 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, - multiproc: bool = True, + parallel_kind: str = "multiproc", dtype: Optional[DTypeLike] = None, ) -> None: + if parallel_kind not in ["multiproc", "multithread"]: + raise ValueError("parallel_kind must be 'multiproc' or 'multithread'") + # identify dimensions self.ops = ops mops = np.zeros(len(ops), dtype=int) nops = np.zeros(len(ops), dtype=int) @@ -174,11 +179,11 @@ def __init__( dimsd = (self.nops,) forceflat = True # create pool for multithreading / multiprocessing - self.multiproc = multiproc + self.parallel_kind = parallel_kind self._nproc = nproc self.pool: Optional[mp.pool.Pool] = None if self.nproc > 1: - if multiproc: + if self.parallel_kind == "multiproc": self.pool = mp.Pool(processes=nproc) else: self.pool = mt.ThreadPoolExecutor(max_workers=nproc) @@ -200,9 +205,16 @@ def nproc(self) -> int: @nproc.setter def nproc(self, nprocnew: int) -> None: if self._nproc > 1 and self.pool is not None: - self.pool.close() + if self.parallel_kind == "multiproc": + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() if nprocnew > 1: - self.pool = mp.Pool(processes=nprocnew) + if self.parallel_kind == "multiproc": + self.pool = mp.Pool(processes=nprocnew) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nprocnew) self._nproc = nprocnew def _matvec_serial(self, x: NDArray) -> NDArray: @@ -236,8 +248,6 @@ def _rmatvec_serial(self, x: NDArray) -> NDArray: return y def _matvec_multiproc(self, x: NDArray) -> NDArray: - if self.pool is None: - raise ValueError ys = self.pool.starmap( _matvec_rmatvec_map, [ @@ -249,8 +259,6 @@ def _matvec_multiproc(self, x: NDArray) -> NDArray: return y def _rmatvec_multiproc(self, x: NDArray) -> NDArray: - if self.pool is None: - raise ValueError ys = self.pool.starmap( _matvec_rmatvec_map, [ @@ -262,8 +270,6 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: return y def _matvec_multithread(self, x: NDArray) -> NDArray: - if self.pool is None: - raise ValueError ys = list( self.pool.map( lambda args: _matvec_rmatvec_map(*args), @@ -277,8 +283,6 @@ def _matvec_multithread(self, x: NDArray) -> NDArray: return y def _rmatvec_multithread(self, x: NDArray) -> NDArray: - if self.pool is None: - raise ValueError ys = list( self.pool.map( lambda args: _matvec_rmatvec_map(*args), @@ -295,7 +299,7 @@ def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - if self.multiproc: + if self.parallel_kind == "multiproc": y = self._matvec_multiproc(x) else: y = self._matvec_multithread(x) @@ -305,7 +309,7 @@ def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - if self.multiproc: + if self.parallel_kind == "multiproc": y = self._rmatvec_multiproc(x) else: y = self._rmatvec_multithread(x) @@ -316,7 +320,7 @@ def close(self): / multithreading. """ if self.pool is not None: - if self.multiproc: + if self.parallel_kind == "multiproc": self.pool.close() self.pool.join() else: diff --git a/pylops/basicoperators/hstack.py b/pylops/basicoperators/hstack.py index cefc1373..bf94a02a 100644 --- a/pylops/basicoperators/hstack.py +++ b/pylops/basicoperators/hstack.py @@ -68,10 +68,13 @@ class HStack(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. - multiproc : :obj:`bool`, optional + parallel_kind : :obj:`str`, optional .. versionadded:: 2.6.0 - Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. + Parallelism kind when ``nproc>1``. Can be ``multiproc`` (using + :mod:`multiprocessing`) or ``multithread`` (using + :class:`concurrent.futures.ThreadPoolExecutor`). Defaults + to ``multiproc``. dtype : :obj:`str`, optional Type of elements in input array. @@ -91,10 +94,9 @@ class HStack(LinearOperator): Shape of the array after the forward, but before flattening. For example, ``y_reshaped = (Op * x.ravel()).reshape(Op.dimsd)``. - pool : :obj:`multiprocessing.Pool` or :obj:`None` - Pool of workers used to evaluate the N operators in parallel using - ``multiprocessing``. When ``nproc=1``, no pool is created (i.e., - ``pool=None``). + pool : :obj:`multiprocessing.Pool` or :obj:`concurrent.futures.ThreadPoolExecutor` or :obj:`None` + Pool of workers used to evaluate the N operators in parallel. + When ``nproc=1``, no pool is created (i.e., ``pool=None``). shape : :obj:`tuple` Operator shape. @@ -152,9 +154,12 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, - multiproc: bool = True, + parallel_kind: str = "multiproc", dtype: Optional[str] = None, ) -> None: + if parallel_kind not in ["multiproc", "multithread"]: + raise ValueError("parallel_kind must be 'multiproc' or 'multithread'") + # identify dimensions self.ops = ops mops = np.zeros(len(ops), dtype=int) for iop, oper in enumerate(ops): @@ -176,11 +181,11 @@ def __init__( dimsd = (self.nops,) forceflat = True # create pool for multithreading / multiprocessing - self.multiproc = multiproc + self.parallel_kind = parallel_kind self._nproc = nproc self.pool = None if self.nproc > 1: - if multiproc: + if self.parallel_kind == "multiproc": self.pool = mp.Pool(processes=nproc) else: self.pool = mt.ThreadPoolExecutor(max_workers=nproc) @@ -202,14 +207,14 @@ def nproc(self) -> int: @nproc.setter def nproc(self, nprocnew: int): - if self._nproc > 1: - if self.multiproc: + if self._nproc > 1 and self.pool is not None: + if self.parallel_kind == "multiproc": self.pool.close() self.pool.join() else: self.pool.shutdown() if nprocnew > 1: - if self.multiproc: + if self.parallel_kind == "multiproc": self.pool = mp.Pool(processes=nprocnew) else: self.pool = mt.ThreadPoolExecutor(max_workers=nprocnew) @@ -266,19 +271,6 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y - # def _matvec_multithread(self, x: NDArray) -> NDArray: - # ys = list( - # self.pool.map( - # lambda args: _matvec_rmatvec_map(*args), - # [ - # (oper._matvec, x[self.mmops[iop] : self.mmops[iop + 1]]) - # for iop, oper in enumerate(self.ops) - # ], - # ) - # ) - # y = np.sum(ys, axis=0) - # return y - def _matvec_multithread(self, x: NDArray) -> NDArray: y = np.zeros(self.nops, dtype=self.dtype) list( @@ -311,7 +303,7 @@ def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - if self.multiproc: + if self.parallel_kind == "multiproc": y = self._matvec_multiproc(x) else: y = self._matvec_multithread(x) @@ -321,7 +313,7 @@ def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - if self.multiproc: + if self.parallel_kind == "multiproc": y = self._rmatvec_multiproc(x) else: y = self._rmatvec_multithread(x) @@ -332,7 +324,7 @@ def close(self): multithreading. """ if self.pool is not None: - if self.multiproc: + if self.parallel_kind == "multiproc": self.pool.close() self.pool.join() else: diff --git a/pylops/basicoperators/vstack.py b/pylops/basicoperators/vstack.py index d7b0e583..c1dcb7bb 100644 --- a/pylops/basicoperators/vstack.py +++ b/pylops/basicoperators/vstack.py @@ -67,10 +67,13 @@ class VStack(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. - multiproc : :obj:`bool`, optional + parallel_kind : :obj:`str`, optional .. versionadded:: 2.6.0 - Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. + Parallelism kind when ``nproc>1``. Can be ``multiproc`` (using + :mod:`multiprocessing`) or ``multithread`` (using + :class:`concurrent.futures.ThreadPoolExecutor`). Defaults + to ``multiproc``. dtype : :obj:`str`, optional Type of elements in input array. @@ -90,10 +93,9 @@ class VStack(LinearOperator): Shape of the array after the forward, but before flattening. For example, ``y_reshaped = (Op * x.ravel()).reshape(Op.dimsd)``. - pool : :obj:`multiprocessing.Pool` or :obj:`None` - Pool of workers used to evaluate the N operators in parallel using - ``multiprocessing``. When ``nproc=1``, no pool is created (i.e., - ``pool=None``). + pool : :obj:`multiprocessing.Pool` or :obj:`concurrent.futures.ThreadPoolExecutor` or :obj:`None` + Pool of workers used to evaluate the N operators in parallel. + When ``nproc=1``, no pool is created (i.e., ``pool=None``). shape : :obj:`tuple` Operator shape. @@ -151,9 +153,12 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, - multiproc: bool = True, + parallel_kind: str = "multiproc", dtype: Optional[DTypeLike] = None, ) -> None: + if parallel_kind not in ["multiproc", "multithread"]: + raise ValueError("parallel_kind must be 'multiproc' or 'multithread'") + # identify dimensions self.ops = ops nops = np.zeros(len(self.ops), dtype=int) for iop, oper in enumerate(ops): @@ -175,11 +180,11 @@ def __init__( dims = (self.mops,) forceflat = True # create pool for multithreading / multiprocessing - self.multiproc = multiproc + self.parallel_kind = parallel_kind self._nproc = nproc self.pool = None if self.nproc > 1: - if self.multiproc: + if self.parallel_kind == "multiproc": self.pool = mp.Pool(processes=nproc) else: self.pool = mt.ThreadPoolExecutor(max_workers=nproc) @@ -201,14 +206,14 @@ def nproc(self) -> int: @nproc.setter def nproc(self, nprocnew: int): - if self._nproc > 1: - if self.multiproc: + if self._nproc > 1 and self.pool is not None: + if self.parallel_kind == "multiproc": self.pool.close() self.pool.join() else: self.pool.shutdown() if nprocnew > 1: - if self.multiproc: + if self.parallel_kind == "multiproc": self.pool = mp.Pool(processes=nprocnew) else: self.pool = mt.ThreadPoolExecutor(max_workers=nprocnew) @@ -275,19 +280,6 @@ def _matvec_multithread(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y - # def _rmatvec_multithread(self, x: NDArray) -> NDArray: - # ys = list( - # self.pool.map( - # lambda args: _matvec_rmatvec_map(*args), - # [ - # (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) - # for iop, oper in enumerate(self.ops) - # ], - # ) - # ) - # y = np.sum(ys, axis=0) - # return y - def _rmatvec_multithread(self, x: NDArray) -> NDArray: y = np.zeros(self.mops, dtype=self.dtype) list( @@ -310,7 +302,7 @@ def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - if self.multiproc: + if self.parallel_kind == "multiproc": y = self._matvec_multiproc(x) else: y = self._matvec_multithread(x) @@ -320,7 +312,7 @@ def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - if self.multiproc: + if self.parallel_kind == "multiproc": y = self._rmatvec_multiproc(x) else: y = self._rmatvec_multithread(x) @@ -331,7 +323,7 @@ def close(self): multithreading. """ if self.pool is not None: - if self.multiproc: + if self.parallel_kind == "multiproc": self.pool.close() self.pool.join() else: diff --git a/pytests/test_combine.py b/pytests/test_combine.py index f899e8e6..980714d4 100644 --- a/pytests/test_combine.py +++ b/pytests/test_combine.py @@ -353,7 +353,7 @@ def test_BlockDiag(par): @pytest.mark.parametrize("par", [(par1), (par2), (par1j), (par2j)]) def test_VStack_multiproc_multithread(par): """Single and multiprocess/multithreading consistentcy for VStack operator""" - for multiproc in [True, False]: + for parallel_kind in ["multiproc", "multithread"]: np.random.seed(0) nproc = 2 G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) @@ -364,7 +364,7 @@ def test_VStack_multiproc_multithread(par): Vmultiop = VStack( [MatrixMult(G, dtype=par["dtype"])] * 4, nproc=nproc, - multiproc=multiproc, + parallel_kind=parallel_kind, dtype=par["dtype"], ) assert dottest( @@ -389,7 +389,7 @@ def test_VStack_multiproc_multithread(par): @pytest.mark.parametrize("par", [(par2), (par2j)]) def test_HStack_multiproc_multithread(par): """Single and multiprocess/multithreading consistentcy for HStack operator""" - for multiproc in [True, False]: + for parallel_kind in ["multiproc", "multithread"]: np.random.seed(0) nproc = 2 G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) @@ -400,7 +400,7 @@ def test_HStack_multiproc_multithread(par): Hmultiop = HStack( [MatrixMult(G, dtype=par["dtype"])] * 4, nproc=nproc, - multiproc=multiproc, + parallel_kind=parallel_kind, dtype=par["dtype"], ) assert dottest( @@ -425,7 +425,7 @@ def test_HStack_multiproc_multithread(par): @pytest.mark.parametrize("par", [(par1), (par2), (par1j), (par2j)]) def test_Block_multiproc_multithread(par): """Single and multiprocess/multithreading consistentcy for Block operator""" - for multiproc in [True, False]: + for parallel_kind in ["multiproc", "multithread"]: np.random.seed(0) nproc = 2 G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) @@ -435,7 +435,9 @@ def test_Block_multiproc_multithread(par): y = np.ones(4 * par["ny"]) + par["imag"] * np.ones(4 * par["ny"]) Bop = Block(Ghor, dtype=par["dtype"]) - Bmultiop = Block(Ghor, nproc=nproc, multiproc=multiproc, dtype=par["dtype"]) + Bmultiop = Block( + Ghor, nproc=nproc, parallel_kind=parallel_kind, dtype=par["dtype"] + ) assert dottest( Bmultiop, 4 * par["ny"], @@ -458,7 +460,7 @@ def test_Block_multiproc_multithread(par): @pytest.mark.parametrize("par", [(par1), (par2), (par1j), (par2j)]) def test_BlockDiag_multiproc_multithread(par): """Single and multiprocess/multithreading consistentcy for BlockDiag operator""" - for multiproc in [True, False]: + for parallel_kind in ["multiproc", "multithread"]: np.random.seed(0) nproc = 2 G = np.random.normal(0, 10, (par["ny"], par["nx"])).astype(par["dtype"]) @@ -469,7 +471,7 @@ def test_BlockDiag_multiproc_multithread(par): BDmultiop = BlockDiag( [MatrixMult(G, dtype=par["dtype"])] * 4, nproc=nproc, - multiproc=multiproc, + parallel_kind=parallel_kind, dtype=par["dtype"], ) assert dottest( From 695439421aef2dafc99cb6fd883bf1a998ca5fc0 Mon Sep 17 00:00:00 2001 From: mrava87 Date: Mon, 10 Nov 2025 21:54:59 +0000 Subject: [PATCH 11/11] minor: remove unused import from block --- pylops/basicoperators/block.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pylops/basicoperators/block.py b/pylops/basicoperators/block.py index 97aa6583..e98f1cda 100644 --- a/pylops/basicoperators/block.py +++ b/pylops/basicoperators/block.py @@ -1,7 +1,5 @@ __all__ = ["Block"] -import concurrent.futures as mt -import multiprocessing as mp from typing import Iterable, Optional from pylops import LinearOperator