diff --git a/pylops/basicoperators/block.py b/pylops/basicoperators/block.py index cc75a6e7..c56b0058 100644 --- a/pylops/basicoperators/block.py +++ b/pylops/basicoperators/block.py @@ -1,5 +1,6 @@ __all__ = ["Block"] +import concurrent.futures as mt import multiprocessing as mp from typing import Iterable, Optional @@ -65,12 +66,16 @@ class Block(_Block): Alternatively, :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel using - ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel using + ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 Force an array to be flattened after rmatvec. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -148,10 +153,17 @@ def __init__( ops: Iterable[Iterable[LinearOperator]], nproc: int = 1, forceflat: bool = None, + multiproc: bool = True, dtype: Optional[DTypeLike] = None, ): if nproc > 1: - self.pool = mp.Pool(processes=nproc) + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) super().__init__( - ops=ops, forceflat=forceflat, dtype=dtype, _args_VStack={"nproc": nproc} + ops=ops, + forceflat=forceflat, + dtype=dtype, + _args_VStack={"nproc": nproc, "multiproc": multiproc}, ) diff --git a/pylops/basicoperators/blockdiag.py b/pylops/basicoperators/blockdiag.py index a049a4e8..39523306 100644 --- a/pylops/basicoperators/blockdiag.py +++ b/pylops/basicoperators/blockdiag.py @@ -1,5 +1,6 @@ __all__ = ["BlockDiag"] +import concurrent.futures as mt import multiprocessing as mp import numpy as np @@ -42,8 +43,8 @@ class BlockDiag(LinearOperator): :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel using - ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel using + ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 @@ -54,6 +55,10 @@ class BlockDiag(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -137,6 +142,7 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, + multiproc: bool = True, dtype: Optional[DTypeLike] = None, ) -> None: self.ops = ops @@ -167,12 +173,15 @@ def __init__( else: dimsd = (self.nops,) forceflat = True - # create pool for multiprocessing + # create pool for multithreading / multiprocessing + self.multiproc = multiproc self._nproc = nproc self.pool: Optional[mp.pool.Pool] = None if self.nproc > 1: - self.pool = mp.Pool(processes=nproc) - + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) self.inoutengine = inoutengine dtype = _get_dtype(ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -252,16 +261,64 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y + def _matvec_multithread(self, x: NDArray) -> NDArray: + if self.pool is None: + raise ValueError + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._matvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + y = np.hstack(ys) + return y + + def _rmatvec_multithread(self, x: NDArray) -> NDArray: + if self.pool is None: + raise ValueError + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._rmatvec, x[self.mmops[iop] : self.mmops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + y = np.hstack(ys) + return y + def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - y = self._matvec_multiproc(x) + if self.multiproc: + y = self._matvec_multiproc(x) + else: + y = self._matvec_multithread(x) return y def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - y = self._rmatvec_multiproc(x) + if self.multiproc: + y = self._rmatvec_multiproc(x) + else: + y = self._rmatvec_multithread(x) return y + + def close(self): + """Close the pool of workers used for multiprocessing + / multithreading. + """ + if self.pool is not None: + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() + self.pool = None diff --git a/pylops/basicoperators/hstack.py b/pylops/basicoperators/hstack.py index 79b12ad4..d87f6854 100644 --- a/pylops/basicoperators/hstack.py +++ b/pylops/basicoperators/hstack.py @@ -1,5 +1,6 @@ __all__ = ["HStack"] +import concurrent.futures as mt import multiprocessing as mp import numpy as np @@ -47,8 +48,8 @@ class HStack(LinearOperator): :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel - using ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel + using ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 @@ -59,6 +60,10 @@ class HStack(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -139,6 +144,7 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, + multiproc: bool = True, dtype: Optional[str] = None, ) -> None: self.ops = ops @@ -161,12 +167,15 @@ def __init__( else: dimsd = (self.nops,) forceflat = True - # create pool for multiprocessing + # create pool for multithreading / multiprocessing + self.multiproc = multiproc self._nproc = nproc self.pool = None if self.nproc > 1: - self.pool = mp.Pool(processes=nproc) - + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) self.inoutengine = inoutengine dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -241,16 +250,58 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.hstack(ys) return y + def _matvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + + y = np.sum(ys, axis=0) + return y + + def _rmatvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [(oper._matvec, x) for iop, oper in enumerate(self.ops)], + ) + ) + y = np.hstack(ys) + return y + def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - y = self._matvec_multiproc(x) + if self.multiproc: + y = self._matvec_multiproc(x) + else: + y = self._matvec_multithread(x) return y def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - y = self._rmatvec_multiproc(x) + if self.multiproc: + y = self._rmatvec_multiproc(x) + else: + y = self._rmatvec_multithread(x) return y + + def close(self): + """Close the pool of workers used for multiprocessing / + multithreading. + """ + if self.pool is not None: + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() + self.pool = None diff --git a/pylops/basicoperators/vstack.py b/pylops/basicoperators/vstack.py index 3f4d50f0..d98ab65c 100644 --- a/pylops/basicoperators/vstack.py +++ b/pylops/basicoperators/vstack.py @@ -1,5 +1,6 @@ __all__ = ["VStack"] +import concurrent.futures as mt import multiprocessing as mp import numpy as np @@ -46,8 +47,8 @@ class VStack(LinearOperator): :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed in place of one or more operators. nproc : :obj:`int`, optional - Number of processes used to evaluate the N operators in parallel using - ``multiprocessing``. If ``nproc=1``, work in serial mode. + Number of processes/threads used to evaluate the N operators in parallel using + ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode. forceflat : :obj:`bool`, optional .. versionadded:: 2.2.0 @@ -58,6 +59,10 @@ class VStack(LinearOperator): Type of output vectors of `matvec` and `rmatvec. If ``None``, this is inferred directly from the input vectors. Note that this is ignored if ``nproc>1``. + multiproc : :obj:`bool`, optional + .. versionadded:: 2.6.0 + + Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``. dtype : :obj:`str`, optional Type of elements in input array. @@ -138,6 +143,7 @@ def __init__( nproc: int = 1, forceflat: bool = None, inoutengine: Optional[tuple] = None, + multiproc: bool = True, dtype: Optional[DTypeLike] = None, ) -> None: self.ops = ops @@ -160,11 +166,15 @@ def __init__( else: dims = (self.mops,) forceflat = True - # create pool for multiprocessing + # create pool for multithreading / multiprocessing + self.multiproc = multiproc self._nproc = nproc self.pool = None if self.nproc > 1: - self.pool = mp.Pool(processes=nproc) + if multiproc: + self.pool = mp.Pool(processes=nproc) + else: + self.pool = mt.ThreadPoolExecutor(max_workers=nproc) self.inoutengine = inoutengine dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype) clinear = all([getattr(oper, "clinear", True) for oper in self.ops]) @@ -239,16 +249,58 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray: y = np.sum(ys, axis=0) return y + def _matvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [(oper._matvec, x) for iop, oper in enumerate(self.ops)], + ) + ) + y = np.hstack(ys) + return y + + def _rmatvec_multithread(self, x: NDArray) -> NDArray: + ys = list( + self.pool.map( + lambda args: _matvec_rmatvec_map(*args), + [ + (oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]]) + for iop, oper in enumerate(self.ops) + ], + ) + ) + + y = np.sum(ys, axis=0) + return y + def _matvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._matvec_serial(x) else: - y = self._matvec_multiproc(x) + if self.multiproc: + y = self._matvec_multiproc(x) + else: + y = self._matvec_multithread(x) return y def _rmatvec(self, x: NDArray) -> NDArray: if self.nproc == 1: y = self._rmatvec_serial(x) else: - y = self._rmatvec_multiproc(x) + if self.multiproc: + y = self._rmatvec_multiproc(x) + else: + y = self._rmatvec_multithread(x) return y + + def close(self): + """Close the pool of workers used for multiprocessing / + multithreading. + """ + if self.pool is not None: + if self.multiproc: + self.pool.close() + self.pool.join() + else: + self.pool.shutdown() + self.pool = None diff --git a/pylops/waveeqprocessing/blending.py b/pylops/waveeqprocessing/blending.py index cc1b54af..defce44d 100644 --- a/pylops/waveeqprocessing/blending.py +++ b/pylops/waveeqprocessing/blending.py @@ -4,6 +4,8 @@ "BlendingHalf", ] +from typing import Optional + import numpy as np from pylops import LinearOperator @@ -36,6 +38,9 @@ class BlendingContinuous(LinearOperator): shiftall : :obj:`bool`, optional Shift all shots together (``True``) or one at the time (``False``). Defaults to ``shiftall=False`` (original implementation), however ``shiftall=True`` should be preferred when ``nr`` is small. + nttot : :obj:`int` + Total number of time samples in blended data (if ``None``, computed internally + based on the the maximum ignition time in ``times``) dtype : :obj:`str`, optional Operator dtype name : :obj:`str`, optional @@ -43,9 +48,6 @@ class BlendingContinuous(LinearOperator): Attributes ---------- - ntot : :obj:`int` - Total number of time samples in blended data (based on the - the maximum ignition time in ``times``) PadOp : :obj:`pylops.basicoperators.Pad` Padding operator used to add one zero at the end of each shot gather to avoid boundary effects when shifting @@ -89,6 +91,7 @@ def __init__( dt: float, times: NDArray, shiftall: bool = False, + nttot: Optional[int] = None, dtype: DTypeLike = "float64", name: str = "B", ) -> None: @@ -99,7 +102,11 @@ def __init__( self.dt = dt self.times = times self.shiftall = shiftall - self.nttot = int(np.max(self.times) / self.dt + self.nt + 1) + self.nttot = ( + nttot + if nttot is not None + else int(np.max(self.times) / self.dt + self.nt + 1) + ) if not self.shiftall: # original implementation, where each source is shifted indipendently self.PadOp = Pad((self.nr, self.nt), ((0, 0), (0, 1)), dtype=self.dtype)