From 84cf039a292601559c9de74b3a08cf9c6e300535 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Wed, 1 Oct 2025 17:45:03 +0100 Subject: [PATCH 1/7] Stream documents to kafka --- pyproject.toml | 1 + src/ibex_bluesky_core/log.py | 2 +- src/ibex_bluesky_core/run_engine/__init__.py | 33 +++++++++++++++++++- tests/test_run_engine.py | 14 ++++++++- 4 files changed, 47 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 34d1fb65..5a3b963a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ classifiers = [ dependencies = [ "bluesky", # Bluesky framework + "bluesky-kafka", # Bluesky-kafka integration "ophyd-async[ca] == 0.12.3", # Device abstraction "matplotlib", # Plotting "lmfit", # Fitting diff --git a/src/ibex_bluesky_core/log.py b/src/ibex_bluesky_core/log.py index 4db30255..85945870 100644 --- a/src/ibex_bluesky_core/log.py +++ b/src/ibex_bluesky_core/log.py @@ -18,7 +18,7 @@ # Find the log directory, if already set in the environment, else use the default log_location = os.environ.get("IBEX_BLUESKY_CORE_LOGS", DEFAULT_LOG_FOLDER) -INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async"] +INTERESTING_LOGGER_NAMES = ["ibex_bluesky_core", "bluesky", "ophyd_async", "bluesky_kafka"] @cache diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index b5a6d68d..4dfa8d42 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -9,15 +9,21 @@ from typing import Any, cast import bluesky.preprocessors as bpp +import msgpack from bluesky.run_engine import RunEngine, RunEngineResult from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted from ibex_bluesky_core.callbacks import DocLoggingCallback from ibex_bluesky_core.preprocessors import add_rb_number_processor -__all__ = ["get_run_engine", "run_plan"] +__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"] +import os +import socket + +from bluesky_kafka import Publisher + from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler from ibex_bluesky_core.utils import is_matplotlib_backend_qt @@ -26,6 +32,21 @@ logger = logging.getLogger(__name__) +DEFAULT_KAFKA_BROKER = "livedata.isis.cclrc.ac.uk:31092" + + +def get_kafka_topic_name() -> str: + """Get the name of the bluesky kafka topic for this machine.""" + computer_name = os.environ.get("COMPUTERNAME", socket.gethostname()).upper() + computer_name = computer_name.upper() + if computer_name.startswith(("NDX", "NDH")): + name = computer_name[3:] + else: + name = computer_name + + return f"{name}_bluesky" + + class _DuringTask(DuringTask): def block(self, blocking_event: Event) -> None: """On windows, event.wait() on the main thread is not interruptible by a CTRL-C. @@ -103,6 +124,16 @@ def get_run_engine() -> RunEngine: log_callback = DocLoggingCallback() RE.subscribe(log_callback) + kafka_callback = Publisher( + topic=get_kafka_topic_name(), + bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), + key="doc", + producer_config={"enable.idempotence": True}, + flush_on_stop_doc=True, + serializer=msgpack.dumps, + ) + RE.subscribe(kafka_callback) + RE.register_command(CALL_SYNC_MSG_KEY, call_sync_handler) RE.register_command(CALL_QT_AWARE_MSG_KEY, call_qt_aware_handler) diff --git a/tests/test_run_engine.py b/tests/test_run_engine.py index 083aa45d..a0b74d23 100644 --- a/tests/test_run_engine.py +++ b/tests/test_run_engine.py @@ -3,6 +3,7 @@ import threading from collections.abc import Generator from typing import Any +from unittest import mock from unittest.mock import MagicMock import bluesky.plan_stubs as bps @@ -11,7 +12,7 @@ from bluesky.run_engine import RunEngineResult from bluesky.utils import Msg, RequestAbort, RunEngineInterrupted -from ibex_bluesky_core.run_engine import _DuringTask, get_run_engine, run_plan +from ibex_bluesky_core.run_engine import _DuringTask, get_kafka_topic_name, get_run_engine, run_plan from ibex_bluesky_core.version import version @@ -146,3 +147,14 @@ def plan(): result = run_plan(plan()) assert result.plan_result == "happy_path_result" assert result.exit_status == "success" + + +def test_get_kafka_topic_name(): + with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="FOO"): + assert get_kafka_topic_name() == "FOO_bluesky" + + with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDXBAR"): + assert get_kafka_topic_name() == "BAR_bluesky" + + with mock.patch("ibex_bluesky_core.run_engine.os.environ.get", return_value="NDHBAZ"): + assert get_kafka_topic_name() == "BAZ_bluesky" From fde0bb879cc856fea000f448377c0b2c7f6e6122 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Wed, 1 Oct 2025 18:12:13 +0100 Subject: [PATCH 2/7] Add docs --- doc/dev/kafka.md | 14 ++++++++++++++ src/ibex_bluesky_core/run_engine/__init__.py | 1 - 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 doc/dev/kafka.md diff --git a/doc/dev/kafka.md b/doc/dev/kafka.md new file mode 100644 index 00000000..9377cc7e --- /dev/null +++ b/doc/dev/kafka.md @@ -0,0 +1,14 @@ +# Kafka + +`ibex_bluesky_core` uses [the `bluesky-kafka` library](https://github.com/bluesky/bluesky-kafka) to send documents +emitted by the `RunEngine` to kafka. The kafka callback is automatically added by +{py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always +enabled. + +Documents are encoded using [the `msgpack` format](https://msgpack.org/index.html). + +The kafka broker to send to can be controlled using the `IBEX_BLUESKY_CORE_KAFKA_BROKER` environment variable, if +an instrument needs to override the default. The kafka topic will be `_bluesky`, where `INSTRUMENT` is the +instrument name with any NDX or NDH prefix stripped. + +The message key will always be `doc` for bluesky documents; specifying a non-null key enforces message ordering. diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index 4dfa8d42..d3ade38e 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -129,7 +129,6 @@ def get_run_engine() -> RunEngine: bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), key="doc", producer_config={"enable.idempotence": True}, - flush_on_stop_doc=True, serializer=msgpack.dumps, ) RE.subscribe(kafka_callback) From 9d63d121bc6de20813b02d180ed17cb358bd7fa9 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Wed, 1 Oct 2025 18:18:35 +0100 Subject: [PATCH 3/7] Disable idempotence Don't kill a scan if kafka fails... --- src/ibex_bluesky_core/run_engine/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index d3ade38e..999e5023 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -128,7 +128,6 @@ def get_run_engine() -> RunEngine: topic=get_kafka_topic_name(), bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), key="doc", - producer_config={"enable.idempotence": True}, serializer=msgpack.dumps, ) RE.subscribe(kafka_callback) From a1f678e5f0a0f47217d8ed43500cda2dac25c5e1 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 2 Oct 2025 20:56:45 +0100 Subject: [PATCH 4/7] Re-enable idempotence --- src/ibex_bluesky_core/run_engine/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index 999e5023..5a43cc27 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -129,6 +129,7 @@ def get_run_engine() -> RunEngine: bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), key="doc", serializer=msgpack.dumps, + producer_config={"enable.idempotence": True}, ) RE.subscribe(kafka_callback) From 503418c8cfdd37cd575c0ca02b05378d28e28cb0 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Fri, 3 Oct 2025 11:01:28 +0100 Subject: [PATCH 5/7] Add ADR --- .../009-kafka-streaming.md | 41 +++++++++++++++++++ pyproject.toml | 5 ++- src/ibex_bluesky_core/run_engine/__init__.py | 20 ++++----- 3 files changed, 52 insertions(+), 14 deletions(-) create mode 100644 doc/architectural_decisions/009-kafka-streaming.md diff --git a/doc/architectural_decisions/009-kafka-streaming.md b/doc/architectural_decisions/009-kafka-streaming.md new file mode 100644 index 00000000..f30152cc --- /dev/null +++ b/doc/architectural_decisions/009-kafka-streaming.md @@ -0,0 +1,41 @@ +# 9. Kafka streaming + +## Status + +Current + +## Context + +Many facilities stream bluesky documents to an event-bus for consumption by out-of-process listeners. +Event buses used for this purpose at other facilities include ZeroMQ, RabbitMQ, Kafka, Redis, NATS, and +others. + +The capability this provides is that callbacks can be run in different processes or on other computers, +without holding up or interfering with the local `RunEngine`. Other groups at ISIS have expressed some +interest in being able to subscribe to bluesky documents. + +## Decision + +- We will stream our messages to Kafka, as opposed to some other message bus. This is because we already +have Kafka infrastructure available for other purposes (e.g. event data & sample-environment data). +- At the time of writing, we will not **depend** on Kafka for anything critical. This is because the +central Kafka instance is not currently considered "reliable" in an experiment controls context. However, +streaming the documents will allow testing to be done. Kafka will eventually be deployed in a "reliable" +way accessible to each instrument. +- We will encode messages from bluesky using `msgpack` (with the `msgpack-numpy` extension), because: + - It is the default encoder used by the upstream `bluesky-kafka` integration + - It is a schema-less encoder, meaning we do not have to write/maintain fixed schemas for all the +documents allowed by `event-model` + - It has reasonable performance in terms of encoding speed and message size + - `msgpack` is very widely supported in a range of programming languages +- Kafka brokers will be configurable via an environment variable, `IBEX_BLUESKY_CORE_KAFKA_BROKER` + +```{note} +Wherever Kafka is mentioned above, the actual implementation may be a Kafka-like (e.g. RedPanda). +``` + +## Justification & Consequences + +We will stream bluesky documents to Kafka, encoded using `msgpack-numpy`. + +At the time of writing this is purely to enable testing, and will not be used for "production" workflows. diff --git a/pyproject.toml b/pyproject.toml index 5a3b963a..2a993fe9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,12 +43,13 @@ dependencies = [ "bluesky", # Bluesky framework "bluesky-kafka", # Bluesky-kafka integration "ophyd-async[ca] == 0.12.3", # Device abstraction - "matplotlib", # Plotting "lmfit", # Fitting - "scipy", # Definitions of erf/erfc functions + "matplotlib", # Plotting + "msgpack-numpy", # Encoding kafka messages "numpy", # General array support "orjson", # json module which handles numpy arrays transparently "scipp", # support for arrays with variances/units + "scipy", # Definitions of erf/erfc functions "scippneutron", # neutron-specific utilities for scipp "typing-extensions", # TypeVar with default-arg support "tzdata", # Windows timezone support diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index 5a43cc27..086626d8 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -3,32 +3,28 @@ import asyncio import functools import logging +import os +import socket from collections.abc import Generator from functools import cache from threading import Event, Lock from typing import Any, cast import bluesky.preprocessors as bpp -import msgpack +import msgpack_numpy from bluesky.run_engine import RunEngine, RunEngineResult from bluesky.utils import DuringTask, Msg, RunEngineControlException, RunEngineInterrupted - -from ibex_bluesky_core.callbacks import DocLoggingCallback -from ibex_bluesky_core.preprocessors import add_rb_number_processor - -__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"] - - -import os -import socket - from bluesky_kafka import Publisher +from ibex_bluesky_core.callbacks import DocLoggingCallback from ibex_bluesky_core.plan_stubs import CALL_QT_AWARE_MSG_KEY, CALL_SYNC_MSG_KEY +from ibex_bluesky_core.preprocessors import add_rb_number_processor from ibex_bluesky_core.run_engine._msg_handlers import call_qt_aware_handler, call_sync_handler from ibex_bluesky_core.utils import is_matplotlib_backend_qt from ibex_bluesky_core.version import version +__all__ = ["get_kafka_topic_name", "get_run_engine", "run_plan"] + logger = logging.getLogger(__name__) @@ -128,7 +124,7 @@ def get_run_engine() -> RunEngine: topic=get_kafka_topic_name(), bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), key="doc", - serializer=msgpack.dumps, + serializer=msgpack_numpy.dumps, producer_config={"enable.idempotence": True}, ) RE.subscribe(kafka_callback) From 13271367460635e3a268411f206ca67fc6b06418 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Mon, 6 Oct 2025 15:46:15 +0100 Subject: [PATCH 6/7] Suppress kafka log --- src/ibex_bluesky_core/run_engine/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ibex_bluesky_core/run_engine/__init__.py b/src/ibex_bluesky_core/run_engine/__init__.py index 086626d8..077583ea 100644 --- a/src/ibex_bluesky_core/run_engine/__init__.py +++ b/src/ibex_bluesky_core/run_engine/__init__.py @@ -125,7 +125,11 @@ def get_run_engine() -> RunEngine: bootstrap_servers=os.environ.get("IBEX_BLUESKY_CORE_KAFKA_BROKER", DEFAULT_KAFKA_BROKER), key="doc", serializer=msgpack_numpy.dumps, - producer_config={"enable.idempotence": True}, + producer_config={ + "enable.idempotence": True, + "log_level": 0, + "log.connection.close": False, + }, ) RE.subscribe(kafka_callback) From 7944746a1a2a12f52623adf55d9cd284cc73ea0b Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Tue, 7 Oct 2025 21:37:41 +0100 Subject: [PATCH 7/7] Mention we considered json_json.fbs --- .../009-kafka-streaming.md | 17 +++++++++++++++++ doc/dev/kafka.md | 3 ++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/doc/architectural_decisions/009-kafka-streaming.md b/doc/architectural_decisions/009-kafka-streaming.md index f30152cc..0d91c27e 100644 --- a/doc/architectural_decisions/009-kafka-streaming.md +++ b/doc/architectural_decisions/009-kafka-streaming.md @@ -34,6 +34,23 @@ documents allowed by `event-model` Wherever Kafka is mentioned above, the actual implementation may be a Kafka-like (e.g. RedPanda). ``` +### Alternatives considered + +Encoding bluesky documents into JSON and then wrapping them in the +[`json_json.fbs` flatbuffers schema](https://github.com/ess-dmsc/streaming-data-types/blob/58793c3dfa060f60b4a933bc085f831744e43f17/schemas/json_json.fbs) +was considered. + +We chose `msgpack` instead of json strings + flatbuffers because: +- It is more standard in the bluesky community (e.g. it is the default used in `bluesky-kafka`) +- Bluesky events will be streamed to a dedicated topic, which is unlikely to be confused with data +using any other schema. + +Performance/storage impacts are unlikely to be noticeable for bluesky documents, but nonetheless: +- `msgpack`-encoded documents are 30-40% smaller than `json` + flatbuffers +for a typical bluesky document +- `msgpack`-encoding messages is ~5x faster than `json` + flatbuffers encoding +for a typical bluesky document. + ## Justification & Consequences We will stream bluesky documents to Kafka, encoded using `msgpack-numpy`. diff --git a/doc/dev/kafka.md b/doc/dev/kafka.md index 9377cc7e..71c03939 100644 --- a/doc/dev/kafka.md +++ b/doc/dev/kafka.md @@ -5,7 +5,8 @@ emitted by the `RunEngine` to kafka. The kafka callback is automatically added b {py:obj}`ibex_bluesky_core.run_engine.get_run_engine`, and so no user configuration is required - the callback is always enabled. -Documents are encoded using [the `msgpack` format](https://msgpack.org/index.html). +Documents are encoded using [the `msgpack` format](https://msgpack.org/index.html) - using the `msgpack-numpy` library +to also handle numpy arrays transparently. The kafka broker to send to can be controlled using the `IBEX_BLUESKY_CORE_KAFKA_BROKER` environment variable, if an instrument needs to override the default. The kafka topic will be `_bluesky`, where `INSTRUMENT` is the