Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/plans/reflectometry/detector_mapping_alignment.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ The plans in this module expect:
{py:obj}`ibex_bluesky_core.devices.simpledae.PeriodSpecIntegralsReducer`.
- An angle map, as a `numpy` array of type `float64`, which has the same dimensionality as the set of selected detectors. This
maps each configured detector pixel to its angular position.
- An optional flood map, as a {external+scipp:py:obj}`scipp.Variable`. This should have a dimension label of "spectrum"
and have the same dimensionality as the set of selected detectors. This array may have variances. This is used to
normalise pixel efficiencies: raw counts are divided by the flood to get scaled counts. If no flood map is provided, no
normalisation will be performed.

## Angle scan

Expand Down
2 changes: 1 addition & 1 deletion src/ibex_bluesky_core/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def start(self, doc: RunStart) -> None:
# where a fit result can be returned before
# the QtAwareCallback has had a chance to process it.
self._subs.append(self._live_fit)
self._subs.append(LiveFitPlot(livefit=self._live_fit, ax=ax))
self._subs.append(LiveFitPlot(livefit=self._live_fit, ax=ax, num_points=5000))
else:
self._subs.append(self._live_fit)

Expand Down
34 changes: 27 additions & 7 deletions src/ibex_bluesky_core/callbacks/reflectometry/_det_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ class DetMapHeightScanLiveDispatcher(LiveDispatcher):
monitor spectrum used for normalization).
"""

def __init__(self, *, mon_name: str, det_name: str, out_name: str) -> None:
def __init__(
self, *, mon_name: str, det_name: str, out_name: str, flood: sc.Variable | None = None
) -> None:
"""Init."""
super().__init__()
self._mon_name = mon_name
self._det_name = det_name
self._out_name = out_name
self._flood = flood if flood is not None else sc.scalar(value=1, dtype="float64")

def event(self, doc: Event, **kwargs: dict[str, Any]) -> Event:
"""Process an event."""
Expand All @@ -62,6 +65,8 @@ def event(self, doc: Event, **kwargs: dict[str, Any]) -> Event:
det = sc.Variable(dims=["spectrum"], values=det_data, variances=det_data, dtype="float64")
mon = sc.Variable(dims=["spectrum"], values=mon_data, variances=mon_data, dtype="float64")

det /= self._flood

det_sum = det.sum()
mon_sum = mon.sum()

Expand Down Expand Up @@ -90,19 +95,31 @@ class DetMapAngleScanLiveDispatcher(LiveDispatcher):
"""

def __init__(
self, x_data: npt.NDArray[np.float64], x_name: str, y_in_name: str, y_out_name: str
self,
x_data: npt.NDArray[np.float64],
x_name: str,
y_in_name: str,
y_out_name: str,
flood: sc.Variable | None = None,
) -> None:
"""Init."""
super().__init__()
self.x_data = x_data
self.x_name = x_name

self.y_data = np.zeros_like(x_data)
self.y_data = sc.array(
dims=["spectrum"],
values=np.zeros_like(x_data),
variances=np.zeros_like(x_data),
dtype="float64",
)
self.y_in_name: str = y_in_name
self.y_out_name: str = y_out_name

self._descriptor_uid: str | None = None

self._flood = flood if flood is not None else sc.scalar(value=1, dtype="float64")

def descriptor(self, doc: EventDescriptor) -> None:
"""Process a descriptor."""
self._descriptor_uid = doc["uid"]
Expand All @@ -118,7 +135,10 @@ def event(self, doc: Event, **kwargs: dict[str, Any]) -> Event:
f"Shape of data ({data.shape} does not match x_data.shape ({self.x_data.shape})"
)

self.y_data += data
scaled_data = (
sc.array(dims=["spectrum"], values=data, variances=data, dtype="float64") / self._flood
)
self.y_data += scaled_data
return doc

def stop(self, doc: RunStop, _md: dict[str, Any] | None = None) -> None:
Expand All @@ -128,13 +148,13 @@ def stop(self, doc: RunStop, _md: dict[str, Any] | None = None) -> None:
return super().stop(doc, _md)

current_time = time.time()
for x, y in zip(self.x_data, self.y_data, strict=True):
for x, y in zip(self.x_data, self.y_data, strict=True): # type: ignore
logger.debug("DetMapAngleScanLiveDispatcher emitting event with x=%f, y=%f", x, y)
event = {
"data": {
self.x_name: x,
self.y_out_name: y,
self.y_out_name + "_err": np.sqrt(y + 0.5),
self.y_out_name: y.value,
self.y_out_name + "_err": np.sqrt(y.variance + 0.5),
},
"timestamps": {
self.x_name: current_time,
Expand Down
34 changes: 18 additions & 16 deletions src/ibex_bluesky_core/fitting.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Fitting methods used by the LiveFit callback."""

import math
from abc import ABC, abstractmethod
from collections.abc import Callable

Expand Down Expand Up @@ -102,6 +103,19 @@ def fit(cls, *args: int) -> FitMethod:
return FitMethod(model=cls.model(*args), guess=cls.guess(*args))


def _guess_cen_and_width(
x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]
) -> tuple[float, float]:
"""Guess the center and width of a positive peak."""
com, total_area = center_of_mass_of_area_under_curve(x, y)
y_range = np.max(y) - np.min(y)
if y_range == 0.0:
width = (np.max(x) - np.min(x)) / 2
else:
width = total_area / y_range
return com, width


class Gaussian(Fit):
"""Gaussian Fitting."""

Expand Down Expand Up @@ -130,8 +144,9 @@ def guess(
def guess(
x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]
) -> dict[str, lmfit.Parameter]:
mean = np.sum(x * y) / np.sum(y)
sigma = np.sqrt(np.sum(y * (x - mean) ** 2) / np.sum(y))
cen, width = _guess_cen_and_width(x, y)
sigma = width / math.sqrt(2 * math.pi) # From expected area under gaussian

background = np.min(y)

if np.max(y) > abs(np.min(y)):
Expand All @@ -142,7 +157,7 @@ def guess(
init_guess = {
"amp": lmfit.Parameter("amp", amp),
"sigma": lmfit.Parameter("sigma", sigma, min=0),
"x0": lmfit.Parameter("x0", mean),
"x0": lmfit.Parameter("x0", cen),
"background": lmfit.Parameter("background", background),
}

Expand Down Expand Up @@ -547,19 +562,6 @@ def guess(
return guess


def _guess_cen_and_width(
x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]
) -> tuple[float, float]:
"""Guess the center and width of a positive peak."""
com, total_area = center_of_mass_of_area_under_curve(x, y)
y_range = np.max(y) - np.min(y)
if y_range == 0.0:
width = (np.max(x) - np.min(x)) / 2
else:
width = total_area / y_range
return com, width


class TopHat(Fit):
"""Top Hat Fitting."""

Expand Down
89 changes: 80 additions & 9 deletions src/ibex_bluesky_core/plans/reflectometry/_det_map_align.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@

import logging
from collections.abc import Generator
from typing import TypedDict
from typing import Any, TypedDict

import bluesky.plan_stubs as bps
import bluesky.plans as bp
import lmfit
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import scipp as sc
from bluesky.preprocessors import subs_decorator
from bluesky.protocols import NamedMovable
from bluesky.utils import Msg
from lmfit import Parameter
from lmfit.model import ModelResult
from matplotlib.axes import Axes
from ophyd_async.plan_stubs import ensure_connected

from ibex_bluesky_core.callbacks import ISISCallbacks, LiveFit, LivePColorMesh
from ibex_bluesky_core.callbacks import ISISCallbacks, LiveFit, LivePColorMesh, PlotPNGSaver
from ibex_bluesky_core.callbacks.reflectometry import (
DetMapAngleScanLiveDispatcher,
DetMapHeightScanLiveDispatcher,
Expand All @@ -28,7 +31,7 @@
Waiter,
check_dae_strategies,
)
from ibex_bluesky_core.fitting import Gaussian
from ibex_bluesky_core.fitting import FitMethod, Gaussian
from ibex_bluesky_core.plan_stubs import call_qt_aware

__all__ = ["DetMapAlignResult", "angle_scan_plan", "height_and_angle_scan_plan"]
Expand All @@ -38,13 +41,17 @@


def _height_scan_callback_and_fit(
reducer: PeriodSpecIntegralsReducer, height: NamedMovable[float], ax: Axes
reducer: PeriodSpecIntegralsReducer,
height: NamedMovable[float],
ax: Axes,
flood: sc.Variable | None,
) -> tuple[DetMapHeightScanLiveDispatcher, LiveFit]:
intensity = "intensity"
height_scan_ld = DetMapHeightScanLiveDispatcher(
mon_name=reducer.mon_integrals.name,
det_name=reducer.det_integrals.name,
out_name=intensity,
flood=flood,
)

height_scan_callbacks = ISISCallbacks(
Expand All @@ -56,15 +63,32 @@ def _height_scan_callback_and_fit(
ax=ax,
live_fit_logger_postfix="_height",
human_readable_file_postfix="_height",
save_plot_to_png=False,
)
for cb in height_scan_callbacks.subs:
height_scan_ld.subscribe(cb)

def set_title_to_height_fit_result(name: str, doc: dict[str, Any]) -> None:
fit_result = height_scan_callbacks.live_fit.result
if fit_result is not None:
ax.set_title(
f"Best x0: {fit_result.params['x0'].value:.4f} "
f"+/- {fit_result.params['x0'].stderr:.4f}"
)
else:
ax.set_title("Fit failed") # pragma: no cover
plt.draw()

height_scan_ld.subscribe(set_title_to_height_fit_result, "stop")

return height_scan_ld, height_scan_callbacks.live_fit


def _angle_scan_callback_and_fit(
reducer: PeriodSpecIntegralsReducer, angle_map: npt.NDArray[np.float64], ax: Axes
reducer: PeriodSpecIntegralsReducer,
angle_map: npt.NDArray[np.float64],
ax: Axes,
flood: sc.Variable | None,
) -> tuple[DetMapAngleScanLiveDispatcher, LiveFit]:
angle_name = "angle"
counts_name = "counts"
Expand All @@ -74,24 +98,61 @@ def _angle_scan_callback_and_fit(
x_name=angle_name,
y_in_name=reducer.det_integrals.name,
y_out_name=counts_name,
flood=flood,
)

# The angle fit is prone to having skewed data which drags centre-of-mass
# guess away from real peak. For this data, it's actually better to guess the
# centre as the x-value corresponding to the max y-value. The guesses for background,
# sigma, and amplitude are left as-is.
def gaussian_max_y_guess(
x: npt.NDArray[np.float64], y: npt.NDArray[np.float64]
) -> dict[str, Parameter]:
guess = Gaussian().guess()(x, y)
max_y_idx = np.argmax(y)
guess["x0"] = lmfit.Parameter("x0", x[max_y_idx])
return guess

fit_method = FitMethod(
model=Gaussian.model(),
guess=gaussian_max_y_guess,
)

angle_scan_callbacks = ISISCallbacks(
x=angle_name,
y=counts_name,
yerr=f"{counts_name}_err",
fit=Gaussian().fit(),
fit=fit_method,
add_peak_stats=False,
add_table_cb=False,
ax=ax,
live_fit_logger_postfix="_angle",
human_readable_file_postfix="_angle",
live_fit_update_every=len(angle_map) - 1,
live_plot_update_on_every_event=False,
save_plot_to_png=False,
)
for cb in angle_scan_callbacks.subs:
angle_scan_ld.subscribe(cb)

def set_title_to_angle_fit_result(name: str, doc: dict[str, Any]) -> None:
fit_result = angle_scan_callbacks.live_fit.result
if fit_result is not None:
ax.set_title(
f"Best x0: {fit_result.params['x0'].value:.4f} "
f"+/- {fit_result.params['x0'].stderr:.4f}"
)
else:
ax.set_title("Fit failed") # pragma: no cover
plt.draw()

angle_scan_ld.subscribe(set_title_to_angle_fit_result, "stop")

# Make sure the Plot PNG saving happens *after* setting plot title to fit result...
angle_scan_ld.subscribe(
PlotPNGSaver(x=angle_name, y=counts_name, ax=ax, postfix="", output_dir=None)
)

return angle_scan_ld, angle_scan_callbacks.live_fit


Expand All @@ -110,6 +171,7 @@ def angle_scan_plan(
dae: SimpleDae[PeriodPerPointController, Waiter, PeriodSpecIntegralsReducer],
*,
angle_map: npt.NDArray[np.float64],
flood: sc.Variable | None = None,
) -> Generator[Msg, None, ModelResult | None]:
"""Reflectometry detector-mapping angle alignment plan.

Expand All @@ -121,6 +183,10 @@ def angle_scan_plan(
dae: The DAE to acquire from
angle_map: a numpy array, with the same shape as detectors,
describing the detector angle of each detector pixel
flood: Optional scipp data array describing a flood-correction.
This array should be aligned along a "spectrum" dimension; counts are
divided by this array before being used in fits. This is used to
normalise the intensities detected by each detector pixel.

"""
logger.info("Starting angle scan")
Expand All @@ -138,7 +204,7 @@ def angle_scan_plan(
yield from call_qt_aware(plt.show)
_, ax = yield from call_qt_aware(plt.subplots)

angle_cb, angle_fit = _angle_scan_callback_and_fit(reducer, angle_map, ax)
angle_cb, angle_fit = _angle_scan_callback_and_fit(reducer, angle_map, ax, flood=flood)

@subs_decorator(
[
Expand Down Expand Up @@ -180,6 +246,7 @@ def height_and_angle_scan_plan( # noqa PLR0913
num: int,
angle_map: npt.NDArray[np.float64],
rel: bool = False,
flood: sc.Variable | None = None,
) -> Generator[Msg, None, DetMapAlignResult]:
"""Reflectometry detector-mapping simultaneous height & angle alignment plan.

Expand All @@ -195,6 +262,10 @@ def height_and_angle_scan_plan( # noqa PLR0913
angle_map: a numpy array, with the same shape as detectors,
describing the detector angle of each detector pixel
rel: whether this scan should be absolute (default) or relative
flood: Optional scipp data array describing a flood-correction.
This array should be aligned along a "spectrum" dimension; counts are
divided by this array before being used in fits. This is used to
normalise the intensities detected by each detector pixel.

Returns:
A dictionary containing the fit results from gaussian height and angle fits.
Expand Down Expand Up @@ -224,8 +295,8 @@ def height_and_angle_scan_plan( # noqa PLR0913
cmap="hot",
shading="auto",
)
height_cb, height_fit = _height_scan_callback_and_fit(reducer, height, height_ax)
angle_cb, angle_fit = _angle_scan_callback_and_fit(reducer, angle_map, angle_ax)
height_cb, height_fit = _height_scan_callback_and_fit(reducer, height, height_ax, flood=flood)
angle_cb, angle_fit = _angle_scan_callback_and_fit(reducer, angle_map, angle_ax, flood=flood)

@subs_decorator(
[
Expand Down
Loading