Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions pylops/basicoperators/block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__all__ = ["Block"]

import concurrent.futures as mt
import multiprocessing as mp
from typing import Iterable, Optional

Expand Down Expand Up @@ -65,12 +66,16 @@ class Block(_Block):
Alternatively, :obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices
can be passed in place of one or more operators.
nproc : :obj:`int`, optional
Number of processes used to evaluate the N operators in parallel using
``multiprocessing``. If ``nproc=1``, work in serial mode.
Number of processes/threads used to evaluate the N operators in parallel using
``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode.
forceflat : :obj:`bool`, optional
.. versionadded:: 2.2.0

Force an array to be flattened after rmatvec.
multiproc : :obj:`bool`, optional
.. versionadded:: 2.6.0

Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``.
dtype : :obj:`str`, optional
Type of elements in input array.

Expand Down Expand Up @@ -148,10 +153,17 @@ def __init__(
ops: Iterable[Iterable[LinearOperator]],
nproc: int = 1,
forceflat: bool = None,
multiproc: bool = True,
dtype: Optional[DTypeLike] = None,
):
if nproc > 1:
self.pool = mp.Pool(processes=nproc)
if multiproc:
self.pool = mp.Pool(processes=nproc)
else:
self.pool = mt.ThreadPoolExecutor(max_workers=nproc)
super().__init__(
ops=ops, forceflat=forceflat, dtype=dtype, _args_VStack={"nproc": nproc}
ops=ops,
forceflat=forceflat,
dtype=dtype,
_args_VStack={"nproc": nproc, "multiproc": multiproc},
)
71 changes: 64 additions & 7 deletions pylops/basicoperators/blockdiag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__all__ = ["BlockDiag"]

import concurrent.futures as mt
import multiprocessing as mp

import numpy as np
Expand Down Expand Up @@ -42,8 +43,8 @@ class BlockDiag(LinearOperator):
:obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed
in place of one or more operators.
nproc : :obj:`int`, optional
Number of processes used to evaluate the N operators in parallel using
``multiprocessing``. If ``nproc=1``, work in serial mode.
Number of processes/threads used to evaluate the N operators in parallel using
``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode.
forceflat : :obj:`bool`, optional
.. versionadded:: 2.2.0

Expand All @@ -54,6 +55,10 @@ class BlockDiag(LinearOperator):
Type of output vectors of `matvec` and `rmatvec. If ``None``, this is
inferred directly from the input vectors. Note that this is ignored
if ``nproc>1``.
multiproc : :obj:`bool`, optional
.. versionadded:: 2.6.0

Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``.
dtype : :obj:`str`, optional
Type of elements in input array.

Expand Down Expand Up @@ -137,6 +142,7 @@ def __init__(
nproc: int = 1,
forceflat: bool = None,
inoutengine: Optional[tuple] = None,
multiproc: bool = True,
dtype: Optional[DTypeLike] = None,
) -> None:
self.ops = ops
Expand Down Expand Up @@ -167,12 +173,15 @@ def __init__(
else:
dimsd = (self.nops,)
forceflat = True
# create pool for multiprocessing
# create pool for multithreading / multiprocessing
self.multiproc = multiproc
self._nproc = nproc
self.pool: Optional[mp.pool.Pool] = None
if self.nproc > 1:
self.pool = mp.Pool(processes=nproc)

if multiproc:
self.pool = mp.Pool(processes=nproc)
else:
self.pool = mt.ThreadPoolExecutor(max_workers=nproc)
self.inoutengine = inoutengine
dtype = _get_dtype(ops) if dtype is None else np.dtype(dtype)
clinear = all([getattr(oper, "clinear", True) for oper in self.ops])
Expand Down Expand Up @@ -252,16 +261,64 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray:
y = np.hstack(ys)
return y

def _matvec_multithread(self, x: NDArray) -> NDArray:
if self.pool is None:
raise ValueError
ys = list(
self.pool.map(
lambda args: _matvec_rmatvec_map(*args),
[
(oper._matvec, x[self.nnops[iop] : self.nnops[iop + 1]])
for iop, oper in enumerate(self.ops)
],
)
)
y = np.hstack(ys)
return y

def _rmatvec_multithread(self, x: NDArray) -> NDArray:
if self.pool is None:
raise ValueError
ys = list(
self.pool.map(
lambda args: _matvec_rmatvec_map(*args),
[
(oper._rmatvec, x[self.mmops[iop] : self.mmops[iop + 1]])
for iop, oper in enumerate(self.ops)
],
)
)
y = np.hstack(ys)
return y

def _matvec(self, x: NDArray) -> NDArray:
if self.nproc == 1:
y = self._matvec_serial(x)
else:
y = self._matvec_multiproc(x)
if self.multiproc:
y = self._matvec_multiproc(x)
else:
y = self._matvec_multithread(x)
return y

def _rmatvec(self, x: NDArray) -> NDArray:
if self.nproc == 1:
y = self._rmatvec_serial(x)
else:
y = self._rmatvec_multiproc(x)
if self.multiproc:
y = self._rmatvec_multiproc(x)
else:
y = self._rmatvec_multithread(x)
return y

def close(self):
"""Close the pool of workers used for multiprocessing
/ multithreading.
"""
if self.pool is not None:
if self.multiproc:
self.pool.close()
self.pool.join()
else:
self.pool.shutdown()
self.pool = None
65 changes: 58 additions & 7 deletions pylops/basicoperators/hstack.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
__all__ = ["HStack"]

import concurrent.futures as mt
import multiprocessing as mp

import numpy as np
Expand Down Expand Up @@ -47,8 +48,8 @@ class HStack(LinearOperator):
:obj:`numpy.ndarray` or :obj:`scipy.sparse` matrices can be passed
in place of one or more operators.
nproc : :obj:`int`, optional
Number of processes used to evaluate the N operators in parallel
using ``multiprocessing``. If ``nproc=1``, work in serial mode.
Number of processes/threads used to evaluate the N operators in parallel
using ``multiprocessing``/``concurrent.futures``. If ``nproc=1``, work in serial mode.
forceflat : :obj:`bool`, optional
.. versionadded:: 2.2.0

Expand All @@ -59,6 +60,10 @@ class HStack(LinearOperator):
Type of output vectors of `matvec` and `rmatvec. If ``None``, this is
inferred directly from the input vectors. Note that this is ignored
if ``nproc>1``.
multiproc : :obj:`bool`, optional
.. versionadded:: 2.6.0

Use multiprocessing (``True``) or multithreading (``False``) when ``nproc>1``.
dtype : :obj:`str`, optional
Type of elements in input array.

Expand Down Expand Up @@ -139,6 +144,7 @@ def __init__(
nproc: int = 1,
forceflat: bool = None,
inoutengine: Optional[tuple] = None,
multiproc: bool = True,
dtype: Optional[str] = None,
) -> None:
self.ops = ops
Expand All @@ -161,12 +167,15 @@ def __init__(
else:
dimsd = (self.nops,)
forceflat = True
# create pool for multiprocessing
# create pool for multithreading / multiprocessing
self.multiproc = multiproc
self._nproc = nproc
self.pool = None
if self.nproc > 1:
self.pool = mp.Pool(processes=nproc)

if multiproc:
self.pool = mp.Pool(processes=nproc)
else:
self.pool = mt.ThreadPoolExecutor(max_workers=nproc)
self.inoutengine = inoutengine
dtype = _get_dtype(self.ops) if dtype is None else np.dtype(dtype)
clinear = all([getattr(oper, "clinear", True) for oper in self.ops])
Expand Down Expand Up @@ -241,16 +250,58 @@ def _rmatvec_multiproc(self, x: NDArray) -> NDArray:
y = np.hstack(ys)
return y

def _matvec_multithread(self, x: NDArray) -> NDArray:
ys = list(
self.pool.map(
lambda args: _matvec_rmatvec_map(*args),
[
(oper._rmatvec, x[self.nnops[iop] : self.nnops[iop + 1]])
for iop, oper in enumerate(self.ops)
],
)
)

y = np.sum(ys, axis=0)
return y

def _rmatvec_multithread(self, x: NDArray) -> NDArray:
ys = list(
self.pool.map(
lambda args: _matvec_rmatvec_map(*args),
[(oper._matvec, x) for iop, oper in enumerate(self.ops)],
)
)
y = np.hstack(ys)
return y

def _matvec(self, x: NDArray) -> NDArray:
if self.nproc == 1:
y = self._matvec_serial(x)
else:
y = self._matvec_multiproc(x)
if self.multiproc:
y = self._matvec_multiproc(x)
else:
y = self._matvec_multithread(x)
return y

def _rmatvec(self, x: NDArray) -> NDArray:
if self.nproc == 1:
y = self._rmatvec_serial(x)
else:
y = self._rmatvec_multiproc(x)
if self.multiproc:
y = self._rmatvec_multiproc(x)
else:
y = self._rmatvec_multithread(x)
return y

def close(self):
"""Close the pool of workers used for multiprocessing /
multithreading.
"""
if self.pool is not None:
if self.multiproc:
self.pool.close()
self.pool.join()
else:
self.pool.shutdown()
self.pool = None
Loading
Loading