From 2e1eda5c87e81968a578d5bb596bdf0a7f960106 Mon Sep 17 00:00:00 2001 From: xImoZA Date: Mon, 13 Oct 2025 15:35:16 +0300 Subject: [PATCH 1/2] feat(parallel): parallelize maximization step --- .../iterative/steps/maximization_step.py | 42 ++++++++++++++++--- .../iterative/steps/test_maximization_step.py | 17 ++++++++ 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py b/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py index f747d68b..6ecb0729 100644 --- a/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py +++ b/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py @@ -19,6 +19,7 @@ from typing import Callable, ClassVar import numpy as np +from joblib import Parallel, delayed from ....distributions import ContinuousDistribution from ....optimizers import Optimizer @@ -98,6 +99,36 @@ def _update_components_params(self, component: ContinuousDistribution, params: d param_values = list(params.values()) component.set_params_from_vector(param_names, param_values) + @staticmethod + def _optimization_worker( + state: PipelineState, + block: OptimizationBlock, + strategy: Callable, + optimizer: Optimizer, + ) -> tuple[int, dict[str, float]]: + """Helper method to execute the optimization strategy for a single block. + + Parameters + ---------- + state : PipelineState + The current state of the estimation pipeline. + block : OptimizationBlock + The configuration block defining which component and parameters to optimize. + strategy : Callable + The function implementing the specific maximization strategy to be used. + optimizer : Optimizer + The optimizer instance passed to the strategy function. + + Returns + ------- + tuple[int, dict[str, float]] + A tuple containing the component ID and a dictionary of its newly optimized parameters. + """ + component = state.curr_mixture[block.component_id] + component_id, new_params = strategy(component, state, block, optimizer) + + return component_id, new_params + def run(self, state: PipelineState) -> PipelineState: """Executes the M-step. @@ -125,13 +156,14 @@ def run(self, state: PipelineState) -> PipelineState: state.error = error return state - results = [] curr_mixture = state.curr_mixture - for block in self.blocks: - strategy = self._strategies[block.maximization_strategy] - component_id, new_params = strategy(curr_mixture[block.component_id], state, block, self.optimizer) - results.append((component_id, new_params)) + results = Parallel(n_jobs=-1)( + delayed(MaximizationStep._optimization_worker)( + state, block, self._strategies[block.maximization_strategy], self.optimizer + ) + for block in self.blocks + ) for result in results: component_id, params = result diff --git a/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py b/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py index 9ce5f936..52103610 100644 --- a/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py +++ b/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py @@ -21,6 +21,11 @@ from rework_pysatl_mpest.optimizers import Optimizer +def serial_executor(generator): + """ "A mock executor for joblib that runs tasks sequentially in a single process.""" + return [func(*args, **kwargs) for func, args, kwargs in generator] + + @pytest.fixture def mock_optimizer(mocker: MockerFixture) -> Optimizer: """Fixture to create a mock Optimizer.""" @@ -138,6 +143,12 @@ def test_run_calls_correct_strategy_and_updates_params( target_component = mock_components[0] + # Mock parallel execution to run tasks sequentially for testing. + mocker.patch( + "rework_pysatl_mpest.estimators.iterative.steps.maximization_step.Parallel", + return_value=serial_executor, + ) + step.run(pipeline_state) # mock_strategy was called once @@ -219,6 +230,12 @@ def test_run_processes_blocks_sequentially( new_strategies = {MaximizationStrategy.QFUNCTION: mock_strategy} mocker.patch.object(MaximizationStep, "_strategies", new_strategies) + # Mock parallel execution to run tasks sequentially for testing. + mocker.patch( + "rework_pysatl_mpest.estimators.iterative.steps.maximization_step.Parallel", + return_value=serial_executor, + ) + # Act step.run(pipeline_state) From 6b7733ed114fe173d56e9a92c2cd089d400a0e5e Mon Sep 17 00:00:00 2001 From: xImoZA Date: Mon, 13 Oct 2025 20:02:53 +0300 Subject: [PATCH 2/2] feat(parallel): switch to threading backend --- pyproject.toml | 1 + .../iterative/steps/maximization_step.py | 38 ++++++++++++------- .../iterative/steps/test_maximization_step.py | 2 +- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a0f12554..d53ef5cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ numpy = ">=2.2.3,<3.0.0" scikit-learn = ">=1.6.1,<2.0.0" scipy = ">=1.15.2,<2.0.0" seaborn = ">=0.13.2,<0.14.0" +joblib = "^1.5.2" [tool.poetry.group.dev.dependencies] pytest = "^8.3.4" diff --git a/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py b/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py index 6ecb0729..6725a086 100644 --- a/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py +++ b/rework_pysatl_mpest/estimators/iterative/steps/maximization_step.py @@ -9,17 +9,18 @@ Expectation-step. """ -__author__ = "Danil Totmyanin" +__author__ = "Danil Totmyanin, Aleksandra Ri" __copyright__ = "Copyright (c) 2025 PySATL project" __license__ = "SPDX-License-Identifier: MIT" +import os from collections.abc import Mapping, Sequence from types import MappingProxyType -from typing import Callable, ClassVar +from typing import Callable, ClassVar, Optional import numpy as np -from joblib import Parallel, delayed +from joblib import Parallel, delayed, parallel_backend from ....distributions import ContinuousDistribution from ....optimizers import Optimizer @@ -47,6 +48,11 @@ class MaximizationStep(PipelineStep): optimizer : Optimizer A numerical optimizer instance used to find the optimal parameters when an analytical solution is not available for a given strategy. + n_jobs : Optional[int], default=None + The number of jobs to run in parallel for the optimization tasks. + - ``None`` (default): The number of jobs is determined automatically. It will + be the minimum of the number of optimization blocks and the number of + available CPUs. Attributes ---------- @@ -67,10 +73,18 @@ class MaximizationStep(PipelineStep): {MaximizationStrategy.QFUNCTION: q_function_strategy} ) - def __init__(self, blocks: Sequence[OptimizationBlock], optimizer: Optimizer): + def __init__(self, blocks: Sequence[OptimizationBlock], optimizer: Optimizer, n_jobs: Optional[int] = None): self.blocks = list(blocks) self.optimizer = optimizer + if n_jobs is not None: + self._n_jobs = n_jobs + else: + cpu_count = os.cpu_count() or 1 + default_jobs = min(len(self.blocks), cpu_count) + + self._n_jobs = default_jobs if default_jobs > 0 else 1 + @property def available_next_steps(self) -> list[type[PipelineStep]]: """list[type[PipelineStep]]: Defines the valid subsequent steps. @@ -99,11 +113,10 @@ def _update_components_params(self, component: ContinuousDistribution, params: d param_values = list(params.values()) component.set_params_from_vector(param_names, param_values) - @staticmethod def _optimization_worker( + self, state: PipelineState, block: OptimizationBlock, - strategy: Callable, optimizer: Optimizer, ) -> tuple[int, dict[str, float]]: """Helper method to execute the optimization strategy for a single block. @@ -114,8 +127,6 @@ def _optimization_worker( The current state of the estimation pipeline. block : OptimizationBlock The configuration block defining which component and parameters to optimize. - strategy : Callable - The function implementing the specific maximization strategy to be used. optimizer : Optimizer The optimizer instance passed to the strategy function. @@ -125,7 +136,7 @@ def _optimization_worker( A tuple containing the component ID and a dictionary of its newly optimized parameters. """ component = state.curr_mixture[block.component_id] - component_id, new_params = strategy(component, state, block, optimizer) + component_id, new_params = self._strategies[block.maximization_strategy](component, state, block, optimizer) return component_id, new_params @@ -158,12 +169,11 @@ def run(self, state: PipelineState) -> PipelineState: curr_mixture = state.curr_mixture - results = Parallel(n_jobs=-1)( - delayed(MaximizationStep._optimization_worker)( - state, block, self._strategies[block.maximization_strategy], self.optimizer + # Use threading backend: NumPy/SciPy release the GIL, enabling true parallelism without data copying overhead. + with parallel_backend("threading", n_jobs=self._n_jobs): + results = Parallel()( + delayed(self._optimization_worker)(state, block, self.optimizer) for block in self.blocks ) - for block in self.blocks - ) for result in results: component_id, params = result diff --git a/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py b/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py index 52103610..753b8d5d 100644 --- a/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py +++ b/rework_tests/unit/estimators/iterative/steps/test_maximization_step.py @@ -1,6 +1,6 @@ """Tests for MaximizationStep""" -__author__ = "Danil Totmyanin" +__author__ = "Danil Totmyanin, Aleksandra Ri" __copyright__ = "Copyright (c) 2025 PySATL project" __license__ = "SPDX-License-Identifier: MIT"