From 89517123656e2b7af284fb315dd4c2f6c0410ea2 Mon Sep 17 00:00:00 2001 From: michaelj094 Date: Fri, 9 Jan 2026 17:42:35 +0000 Subject: [PATCH] Add Pythonic Pipeline pattern using functional composition --- README.md | 1 + patterns/behavioral/pipeline.py | 98 +++++++++++++++++++++++++++++++ tests/behavioral/test_pipeline.py | 19 ++++++ 3 files changed, 118 insertions(+) create mode 100644 patterns/behavioral/pipeline.py create mode 100644 tests/behavioral/test_pipeline.py diff --git a/README.md b/README.md index 4d12a2f1..9b1eef68 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ __Behavioral Patterns__: | [mediator](patterns/behavioral/mediator.py) | an object that knows how to connect other objects and act as a proxy | | [memento](patterns/behavioral/memento.py) | generate an opaque token that can be used to go back to a previous state | | [observer](patterns/behavioral/observer.py) | provide a callback for notification of events/changes to data | +| [pipeline](patterns/behavioral/pipeline.py) | compose data processing stages | | [publish_subscribe](patterns/behavioral/publish_subscribe.py) | a source syndicates events/data to 0+ registered listeners | | [registry](patterns/behavioral/registry.py) | keep track of all subclasses of a given class | | [servant](patterns/behavioral/servant.py) | provide common functionality to a group of classes without using inheritance | diff --git a/patterns/behavioral/pipeline.py b/patterns/behavioral/pipeline.py new file mode 100644 index 00000000..b4a46f48 --- /dev/null +++ b/patterns/behavioral/pipeline.py @@ -0,0 +1,98 @@ +""" +Pipeline / Functional Pipeline (Pythonic) + +This implements the Pipeline pattern using a functional approach, +where each stage is a callable transforming an iterable. The goal +is to demonstrate a Pythonic alternative to class-based pipelines +using generators and composition. + +TL;DR: + Build data processing flows by chaining small functions. + In Python, pipelines are best expressed with callables + iterables + (often generators), not heavy class hierarchies. + +References: + - https://martinfowler.com/articles/collection-pipeline/ + - https://en.wikipedia.org/wiki/Pipeline_(software) + + +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable, Iterable, Iterator, TypeVar + +T = TypeVar("T") +U = TypeVar("U") + +# A stage transforms an Iterable[T] into an Iterable[U]. +Stage = Callable[[Iterable[T]], Iterable[U]] + + +def compose(*stages: Stage) -> Stage: + """Compose stages left-to-right into a single stage.""" + def _composed(data: Iterable): + out = data + for stage in stages: + out = stage(out) + return out + return _composed + + +@dataclass(frozen=True) +class Pipeline: + """Convenience wrapper around composed stages.""" + stages: tuple[Stage, ...] + + def __call__(self, data: Iterable[T]) -> Iterable: + fn = compose(*self.stages) + return fn(data) + + def then(self, stage: Stage) -> "Pipeline": + """Return a new Pipeline with an extra stage appended.""" + return Pipeline(self.stages + (stage,)) + + + +def map_stage(fn: Callable[[T], U]) -> Stage: + """Create a mapping stage.""" + def _stage(data: Iterable[T]) -> Iterator[U]: + for item in data: + yield fn(item) + return _stage + + +def filter_stage(pred: Callable[[T], bool]) -> Stage: + """Create a filtering stage.""" + def _stage(data: Iterable[T]) -> Iterator[T]: + for item in data: + if pred(item): + yield item + return _stage + + +def take(n: int) -> Stage: + """Take the first n items from the stream.""" + if n < 0: + raise ValueError("n must be >= 0") + + def _stage(data: Iterable[T]) -> Iterator[T]: + count = 0 + for item in data: + if count >= n: + break + yield item + count += 1 + return _stage + + +if __name__ == "__main__": + # Example: numbers -> keep evens -> square -> take first 3 + p = Pipeline(( + filter_stage(lambda x: x % 2 == 0), + map_stage(lambda x: x * x), + take(3), + )) + + print(list(p(range(100)))) # [0, 4, 16] diff --git a/tests/behavioral/test_pipeline.py b/tests/behavioral/test_pipeline.py new file mode 100644 index 00000000..82369c67 --- /dev/null +++ b/tests/behavioral/test_pipeline.py @@ -0,0 +1,19 @@ +from patterns.behavioral.pipeline import Pipeline, filter_stage, map_stage, take + + +def test_pipeline_composes_stages_lazily(): + p = Pipeline(( + filter_stage(lambda x: x % 2 == 1), + map_stage(lambda x: x + 10), + take(4), + )) + + assert list(p(range(100))) == [11, 13, 15, 17] + + +def test_take_rejects_negative(): + try: + take(-1) + assert False, "Expected ValueError" + except ValueError: + assert True