diff --git a/pysp2/util/normalized_derivative_method.py b/pysp2/util/normalized_derivative_method.py index bf7553b..3cd2257 100644 --- a/pysp2/util/normalized_derivative_method.py +++ b/pysp2/util/normalized_derivative_method.py @@ -6,6 +6,7 @@ from dataclasses import dataclass from typing import Optional, Union + """ Moteki & Kondo (2008) Normalized Derivative Method Module contains functions to compute the normalized derivative of scattering signals @@ -98,7 +99,6 @@ def central_difference(S, num_records=None, normalize=True, baseline_to_zero=Tru return xr.Dataset(dSdt) - def plot_normalized_derivative(S, ds, record_no, chn=0, plot_scattering_signal=False): """ Plots the normalized derivative of the scattering signal for a given record_no and channel. @@ -176,6 +176,80 @@ def plot_normalized_derivative(S, ds, record_no, chn=0, plot_scattering_signal=F return ax +def _resolve_peakfit_window( + peak_ds: xr.Dataset, + *, + event_index: int, + ch: Union[int, str], + event_dim: str = "event_index", + min_start: int = 15, + width_metric: str = "fwhm", # "fwhm" or "fwtm" + n_samples: Optional[int] = None, +) -> tuple[int, int, dict]: + """ + Build the MLE fit window from PySP2 peak-fit outputs. + + Left bound: + PkStart_ch{peak_ch} + + Right bound: + PkStart_ch{peak_ch} + width + + where width is either: + - PkFWHM_ch{peak_ch}, or + - PkFWHM_ch{peak_ch} converted to FWTM if requested + + Returns + ------- + fit_start, fit_stop, info + """ + # --- Normalize channel input --- + if isinstance(ch, str): + # Expect something like "Data_ch0" + if "ch" not in ch: + raise ValueError(f"Cannot infer channel from {ch}") + ch_num = int(ch.split("ch")[-1]) + else: + ch_num = int(ch) + + start_var = f"PkStart_ch{ch_num}" + width_var = f"PkFWHM_ch{ch_num}" + + if start_var not in peak_ds: + raise ValueError(f"{start_var} not found in dataset") + if width_var not in peak_ds: + raise ValueError(f"{width_var} not found in dataset") + + pk_start = float(peak_ds[start_var].isel({event_dim: event_index}).values) + pk_fwhm = float(peak_ds[width_var].isel({event_dim: event_index}).values) + + if not np.isfinite(pk_start): + raise ValueError(f"Invalid peak start: {pk_start}") + if not np.isfinite(pk_fwhm) or pk_fwhm <= 0: + raise ValueError(f"Invalid peak width: {pk_fwhm}") + + if width_metric == "fwhm": + width = pk_fwhm + elif width_metric == "fwtm": + width = pk_fwhm * np.sqrt(np.log(10.0) / np.log(2.0)) + else: + raise ValueError("width_metric must be 'fwhm' or 'fwtm'") + + fit_start = max(min_start, int(np.floor(pk_start))) + fit_stop = int(np.ceil(fit_start + width)) + + if n_samples is not None: + fit_stop = min(fit_stop, n_samples) + + if fit_stop <= fit_start + 1: + raise ValueError(f"Window too small: {fit_start}, {fit_stop}") + + return fit_start, fit_stop, { + "ch": ch_num, + "pk_start": pk_start, + "pk_fwhm": pk_fwhm, + "width_metric": width_metric, + } @dataclass(frozen=True) class MLEConfig: @@ -243,7 +317,6 @@ def _to_dataarray( raise TypeError(f"{name} must be an xarray DataArray or Dataset.") - def _moteki_kondo_subset_statistics( yk: np.ndarray, sk: np.ndarray, @@ -316,13 +389,14 @@ def mle_tau_moteki_kondo( norm_deriv: Union[xr.DataArray, xr.Dataset], p: int, *, - ch: Optional[str] = None, + ch: str = None, event_index: int, event_dim: str = "event_index", S_sample_dim: Optional[str] = None, y_sample_dim: Optional[str] = None, tau_grid: Optional[Union[np.ndarray, xr.DataArray]] = None, - k_end: Optional[int] = None, + min_start: int = 15, + width_metric: str = "fwhm", config: Optional[MLEConfig] = None, ) -> xr.DataArray: """ @@ -336,7 +410,7 @@ def mle_tau_moteki_kondo( Normalized derivative. p : int Number of consecutive points in each k-subset. - ch : str, optional + ch : str Variable to select when S and/or norm_deriv are Datasets. Required if a Dataset contains multiple variables and no unique choice exists. event_index : int @@ -349,65 +423,55 @@ def mle_tau_moteki_kondo( Sample dimension in norm_deriv. tau_grid : 1D array-like, optional Global tau grid for all subsets. - k_end : int, optional - Largest starting k. + min_start : int + Minimum allowed start index to exclude unusable early samples. + width_metric : str + "fwhm" or "fwtm" for defining peak width. config : MLEConfig Calibration / noise / grid settings. """ if config is None: raise ValueError("config must be provided.") + S_original = S - # Convert datasets to the selected DataArrays. + # Convert datasets to DataArrays S = _to_dataarray(S, "S", ch=ch) norm_deriv = _to_dataarray(norm_deriv, "norm_deriv", ch=ch) - # The method requires one event axis and one sample axis. - if event_dim not in S.dims: - raise ValueError(f"{event_dim!r} not found in S.dims={S.dims}") - if event_dim not in norm_deriv.dims: - raise ValueError(f"{event_dim!r} not found in norm_deriv.dims={norm_deriv.dims}") + # Ensure event dimension exists + if event_dim not in S.dims or event_dim not in norm_deriv.dims: + raise ValueError(f"{event_dim!r} must be present in both inputs.") - # Infer the sample dimension if the user did not specify it. + # Infer sample dimensions if S_sample_dim is None: - s_non_event_dims = [d for d in S.dims if d != event_dim] - if len(s_non_event_dims) != 1: - raise ValueError( - f"Could not infer S sample dim. Non-event dims in S: {s_non_event_dims}" - ) - S_sample_dim = s_non_event_dims[0] - + S_sample_dim = [d for d in S.dims if d != event_dim][0] if y_sample_dim is None: - y_non_event_dims = [d for d in norm_deriv.dims if d != event_dim] - if len(y_non_event_dims) != 1: - raise ValueError( - f"Could not infer norm_deriv sample dim. Non-event dims in norm_deriv: {y_non_event_dims}" - ) - y_sample_dim = y_non_event_dims[0] + y_sample_dim = [d for d in norm_deriv.dims if d != event_dim][0] - # Rename the sample dimensions to a common internal name. + # Standardize dimensions S_std = S.rename({S_sample_dim: "sample"}) y_std = norm_deriv.rename({y_sample_dim: "sample"}) - - # Align the arrays so the same event/sample positions are used in both inputs. S_std, y_std = xr.align(S_std, y_std, join="inner") - if event_index < 0 or event_index >= S_std.sizes[event_dim]: - raise ValueError( - f"event_index must be in [0, {S_std.sizes[event_dim] - 1}], got {event_index}" - ) - n_samples = S_std.sizes["sample"] - if p < 2 or p > n_samples: - raise ValueError(f"p must be in [2, {n_samples}], got {p}") + # --- Peak-based fit window --- + # This ensures the MLE only evaluates subsets within the Gaussian peak region. + fit_start, fit_stop, _ = _resolve_peakfit_window( + S_original, + event_index=event_index, + ch=ch, + event_dim=event_dim, + min_start=min_start, + width_metric=width_metric, + n_samples=n_samples, + ) - if k_end is None: - k_end = n_samples - p - if k_end < 0 or k_end > n_samples - p: - raise ValueError(f"k_end must be in [0, {n_samples - p}], got {k_end}") + # Validate subset length + if p < 2 or p > (fit_stop - fit_start): + raise ValueError(f"Invalid p={p} for window [{fit_start}, {fit_stop}).") - # Optional tau grid for the 1D grid search in tau. - # Moteki & Kondo determine tau numerically by maximizing L_k(tau) [Appendix A.5]. + # Optional tau grid for global search if tau_grid is not None: tau_grid_np = np.asarray( tau_grid.data if isinstance(tau_grid, xr.DataArray) else tau_grid, @@ -418,15 +482,13 @@ def mle_tau_moteki_kondo( else: tau_grid_np = None - # Parameters from Appendix A. + # Moteki & Kondo parameters h = float(config.h) sigma_bar = float(config.sigma_bar) delta_sigma = float(config.delta_sigma) A1, A2, A3 = float(config.A1), float(config.A2), float(config.A3) - # Time axis used in the fit. - # Here we use physical time spacing h so tk is in seconds (or whatever unit h uses). - # This must match sigma_bar and delta_sigma units. + # Time axis (must match instrument sampling) t = np.arange(n_samples, dtype=float) * h if h <= 0: @@ -436,102 +498,70 @@ def mle_tau_moteki_kondo( if delta_sigma < 0: raise ValueError("config.delta_sigma must be >= 0.") - def _tau_hat_for_one_event(s_event: np.ndarray, y_event: np.ndarray) -> np.ndarray: + def _tau_hat_for_one_event(s_event, y_event): """ - For one event, scan all k-subsets of length p and return tau_hat(k). - """ - tau_hat = np.full(k_end + 1, np.nan, dtype=float) - - # Skip events with missing values. - if not (np.all(np.isfinite(s_event)) and np.all(np.isfinite(y_event))): - return tau_hat + For one event, scan all k-subsets of length p within the peak window. - for k in range(k_end + 1): - # Consecutive p-point subset starting at k. - # This is the subset over which Moteki & Kondo search for the leading-edge - # segment that best matches I'/I [Appendix A.5]. - yk = y_event[k : k + p] - sk = s_event[k : k + p] - tk = t[k : k + p] + The goal is to find the subset that best matches the expected + I'/I linear behavior for a Gaussian beam [Appendix A.5]. + """ + k_values = np.arange(fit_start, fit_stop - p + 1) + tau_hat = np.full(k_values.size, np.nan) - if not (np.all(np.isfinite(yk)) and np.all(np.isfinite(sk))): - continue + for i, k in enumerate(k_values): + # Consecutive p-point subset starting at k + yk = y_event[k:k+p] + sk = s_event[k:k+p] + tk = t[k:k+p] - # If the user did not supply a global tau grid, build a local grid for this k. + # Build local tau grid if not provided if tau_grid_np is None: - span = float(tk[-1] - tk[0]) + span = tk[-1] - tk[0] margin = config.grid_margin * (span + h) grid = np.linspace(tk[0] - margin, tk[-1] + margin, config.grid_size) else: grid = tau_grid_np - # Grid-search maximization of L_k(tau) [Appendix A.5]. + # Grid-search maximization of likelihood [Appendix A.5] best_ll = -np.inf best_tau = np.nan + for tau_cand in grid: - _, _, _, d2 = _moteki_kondo_subset_statistics( - yk, - sk, - tk, - float(tau_cand), - h=h, - sigma_bar=sigma_bar, - delta_sigma=delta_sigma, - A1=A1, - A2=A2, - A3=A3, + ybar, Sigma, L, d2 = _moteki_kondo_subset_statistics( + yk, sk, tk, tau_cand, + h=h, sigma_bar=sigma_bar, delta_sigma=delta_sigma, + A1=A1, A2=A2, A3=A3 ) - if d2 is None: - ll = -np.inf - else: - # Reconstruct the log-likelihood from the same subset statistics. - # Eq. (A.9): log L = -1/2 * [p log(2pi) + log|Sigma| + d^2] - # Since _moteki_kondo_subset_statistics does not return log|Sigma|, - # we compute likelihood separately here for the grid-search step. - ybar, Sigma, L, d2 = _moteki_kondo_subset_statistics( - yk, - sk, - tk, - float(tau_cand), - h=h, - sigma_bar=sigma_bar, - delta_sigma=delta_sigma, - A1=A1, - A2=A2, - A3=A3, - ) - if L is None or d2 is None: - ll = -np.inf - else: - logdet = 2.0 * np.sum(np.log(np.diag(L))) - p_local = yk.size - ll = float(-0.5 * (p_local * np.log(2.0 * np.pi) + logdet + d2)) + if L is None or d2 is None: + continue + + # Eq. (A.9): log-likelihood + logdet = 2.0 * np.sum(np.log(np.diag(L))) + ll = -0.5 * (len(yk) * np.log(2*np.pi) + logdet + d2) if ll > best_ll: best_ll = ll - best_tau = float(tau_cand) + best_tau = tau_cand if np.isfinite(best_ll): - tau_hat[k] = best_tau + tau_hat[i] = best_tau return tau_hat - # Select the requested event only, and return tau_hat(k) for that one event. - s_event = np.asarray(S_std.sel({event_dim: event_index}).values, dtype=float) - y_event = np.asarray(y_std.sel({event_dim: event_index}).values, dtype=float) + # Extract event + s_event = S_std.sel({event_dim: event_index}).values + y_event = y_std.sel({event_dim: event_index}).values tau_hat_1d = _tau_hat_for_one_event(s_event, y_event) + k_values = np.arange(fit_start, fit_stop - p + 1) return xr.DataArray( tau_hat_1d, dims=("k",), - coords={"k": np.arange(k_end + 1)}, + coords={"k": k_values}, name="tau_hat", - attrs={ - "long_name": f"MLE tau_hat(k) for {event_dim}={event_index}", - "units": "sample_index_or_time_units_of_t", - }, + attrs={"fit_start": fit_start, "fit_stop": fit_stop} ) def compute_d2_moteki_kondo( @@ -540,12 +570,13 @@ def compute_d2_moteki_kondo( tau_hat: Union[np.ndarray, xr.DataArray], p: int, *, - ch: Optional[str] = None, + ch: str = None, event_index: int, event_dim: str = "event_index", S_sample_dim: Optional[str] = None, y_sample_dim: Optional[str] = None, - k_end: Optional[int] = None, + min_start=15, + width_metric="fwhm", config: Optional[MLEConfig] = None, ) -> xr.DataArray: """ @@ -565,7 +596,7 @@ def compute_d2_moteki_kondo( tau_hat(k) for the selected event. Must have length k_end + 1. p : int Number of consecutive points in each k-subset. - ch : str, optional + ch : str Variable name to select when S and/or norm_deriv are Datasets. event_index : int Event index to evaluate. @@ -575,8 +606,10 @@ def compute_d2_moteki_kondo( Sample dimension in S. y_sample_dim : str, optional Sample dimension in norm_deriv. - k_end : int, optional - Largest starting k. If None, inferred from tau_hat length. + min_start : int + Minimum allowed start index to exclude unusable early samples. + width_metric : str + "fwhm" or "fwtm" for defining peak width. config : MLEConfig Calibration / noise / grid settings. @@ -589,6 +622,7 @@ def compute_d2_moteki_kondo( raise ValueError("config must be provided.") # Convert Datasets to DataArrays if needed. + S_original = S S = _to_dataarray(S, "S", ch=ch) norm_deriv = _to_dataarray(norm_deriv, "norm_deriv", ch=ch) @@ -631,93 +665,60 @@ def compute_d2_moteki_kondo( if p < 2 or p > n_samples: raise ValueError(f"p must be in [2, {n_samples}], got {p}") - # Tau-hat is expected to be a vector over k. - tau_hat_np = np.asarray( - tau_hat.data if isinstance(tau_hat, xr.DataArray) else tau_hat, - dtype=float, + # --- Same peak-based window as MLE --- + fit_start, fit_stop, _ = _resolve_peakfit_window( + S_original, + event_index=event_index, + ch=ch, + event_dim=event_dim, + min_start=min_start, + width_metric=width_metric, + n_samples=n_samples, ) - if tau_hat_np.ndim != 1: - raise ValueError("tau_hat must be 1D.") - - if k_end is None: - k_end = tau_hat_np.size - 1 - if k_end < 0: - raise ValueError(f"k_end must be >= 0, got {k_end}") - if tau_hat_np.size != k_end + 1: - raise ValueError( - f"tau_hat length must equal k_end + 1. Got len(tau_hat)={tau_hat_np.size}, " - f"k_end={k_end}." - ) - if k_end > n_samples - p: - raise ValueError(f"k_end must be in [0, {n_samples - p}], got {k_end}") - # Parameters from Appendix A. + k_values = np.arange(fit_start, fit_stop - p + 1) + tau_hat_np = np.asarray(tau_hat) + + if tau_hat_np.size != k_values.size: + raise ValueError("tau_hat length mismatch with window.") + + # Moteki & Kondo parameters h = float(config.h) sigma_bar = float(config.sigma_bar) delta_sigma = float(config.delta_sigma) A1, A2, A3 = float(config.A1), float(config.A2), float(config.A3) - if h <= 0: - raise ValueError("config.h must be positive.") - if sigma_bar <= 0: - raise ValueError("config.sigma_bar must be positive.") - if delta_sigma < 0: - raise ValueError("config.delta_sigma must be >= 0.") - # Time axis used in the fit. # This should match the time axis used in mle_tau_moteki_kondo. - t = np.arange(n_samples, dtype=float) * h + t = np.arange(n_samples) * h - # Select the requested event only. - s_event = np.asarray(S_std.sel({event_dim: event_index}).values, dtype=float) - y_event = np.asarray(y_std.sel({event_dim: event_index}).values, dtype=float) + s_event = S_std.sel({event_dim: event_index}).values + y_event = y_std.sel({event_dim: event_index}).values - if not (np.all(np.isfinite(s_event)) and np.all(np.isfinite(y_event))): - raise ValueError(f"Selected event_index={event_index} contains non-finite values.") + d2_vals = np.full(k_values.size, np.nan) - d2_vals = np.full(k_end + 1, np.nan, dtype=float) + for i, k in enumerate(k_values): + # Same subset definition as in tau_hat search + yk = y_event[k:k+p] + sk = s_event[k:k+p] + tk = t[k:k+p] - for k in range(k_end + 1): - # Consecutive p-point subset starting at k, exactly as in the tau_hat search. - yk = y_event[k : k + p] - sk = s_event[k : k + p] - tk = t[k : k + p] + tau_k = float(tau_hat_np[i]) - if not (np.all(np.isfinite(yk)) and np.all(np.isfinite(sk))): - continue - - tau_k = float(tau_hat_np[k]) - if not np.isfinite(tau_k): - continue - - # Build the subset mean/covariance using the same equations as the MLE. - # Mean: Eq. (A.4) - # Covariance: Eqs. (A.10a), (A.10b) - # Statistical distance: Eq. (A.11) + # Build mean, covariance, and compute distance [Eqs. A.4, A.10, A.11] _, _, _, d2 = _moteki_kondo_subset_statistics( - yk, - sk, - tk, - tau_k, - h=h, - sigma_bar=sigma_bar, - delta_sigma=delta_sigma, - A1=A1, - A2=A2, - A3=A3, + yk, sk, tk, tau_k, + h=h, sigma_bar=sigma_bar, delta_sigma=delta_sigma, + A1=A1, A2=A2, A3=A3 ) - if d2 is not None and np.isfinite(d2): - d2_vals[k] = d2 + if d2 is not None: + d2_vals[i] = d2 - out = xr.DataArray( + return xr.DataArray( d2_vals, dims=("k",), - coords={"k": np.arange(k_end + 1)}, + coords={"k": k_values}, name="d2", - attrs={ - "long_name": f"Moteki & Kondo statistical distance d^2(k) for event_index={event_index}", - "units": "dimensionless", - }, - ) - return out \ No newline at end of file + attrs={"fit_start": fit_start, "fit_stop": fit_stop} + ) \ No newline at end of file diff --git a/tests/test_ndm.py b/tests/test_ndm.py index c75e678..34014c0 100644 --- a/tests/test_ndm.py +++ b/tests/test_ndm.py @@ -41,38 +41,38 @@ def test_mle_estimate_tau(): A3=6.2e-4, ) - ## Test one event + ## Test one event ################################################## tau = mle_tau_moteki_kondo( S=my_binary, norm_deriv=dSdt, p=10, ch="Data_ch0", event_index=499, + min_start=15, + width_metric="fwhm", config=cfg, ) - tau_val_true = my_binary['Data_ch0'].isel(event_index=499).argmax().item()*0.4e-6 + + tau_val_true = my_binary['Data_ch0'].isel(event_index=499).argmax().item()*0.4e-6 # Test that the estimated tau for a subset of results is close to the true value for the event - for i in range(25, 34): + for i in range(13, 17): np.testing.assert_almost_equal(tau[i], tau_val_true, decimal=6) d2 = compute_d2_moteki_kondo( - S=my_binary, - norm_deriv=dSdt, - tau_hat=tau, - p=10, - ch="Data_ch0", - event_index=499, - config=cfg, + S=my_binary, + norm_deriv=dSdt, + tau_hat=tau, + p=10, + ch="Data_ch0", + event_index=499, + min_start=15, + width_metric="fwhm", + config=cfg, ) - # Define k window - k_start, k_end = 18, 34 - d2_subset = d2.isel(k=slice(k_start, k_end + 1)) - # Find index of minimum d2 within subset - k_min_local = int(d2_subset.argmin(dim="k").item()) - k_min = k_start + k_min_local + k_min_local = int(d2.argmin(dim="k").item()) # Get corresponding tau value - tau_best = tau.isel(k=k_min).item() + tau_best = tau.isel(k=k_min_local).item() # Assert closeness np.testing.assert_allclose( @@ -81,68 +81,83 @@ def test_mle_estimate_tau(): atol=0.01e-05, # absolute tolerance = 1e-7 ) - ## Test another event + ## Test another event ################################################## tau = mle_tau_moteki_kondo( S=my_binary, norm_deriv=dSdt, - p=10, + p=6, ch="Data_ch0", event_index=1040, + min_start=15, + width_metric="fwtm", config=cfg, ) tau_val = my_binary['Data_ch0'].isel(event_index=1040).argmax().item()*0.4e-6 # Test that the estimated tau for a subset of results is close to the true value for the event - for i in range(31, 43): - np.testing.assert_almost_equal(tau[i], tau_val, decimal=6) + for i in range(18, 28): + np.testing.assert_allclose(tau[i], tau_val, atol=0.08e-05) d2 = compute_d2_moteki_kondo( - S=my_binary, - norm_deriv=dSdt, - tau_hat=tau, - p=10, - ch="Data_ch0", - event_index=1040, - config=cfg, + S=my_binary, + norm_deriv=dSdt, + tau_hat=tau, + p=6, + ch="Data_ch0", + event_index=1040, + min_start=15, + width_metric="fwtm", + config=cfg, ) - tau_best = tau.isel(k=35).item() + k_min_local = int(d2.argmin(dim="k").item()) + # Get corresponding tau value + tau_best = tau.isel(k=k_min_local).item() + # Assert closeness np.testing.assert_allclose( tau_best, tau_val, - atol=0.05e-05, # absolute tolerance = 1e-7 + atol=0.04e-05, # absolute tolerance = 4e-7 ) - - ## Test another event + + ## Test another event ################################################## tau = mle_tau_moteki_kondo( S=my_binary, norm_deriv=dSdt, - p=10, + p=8, ch="Data_ch4", event_index=2008, + min_start=15, + width_metric="fwtm", config=cfg, ) d2 = compute_d2_moteki_kondo( - S=my_binary, - norm_deriv=dSdt, - tau_hat=tau, - p=10, - ch="Data_ch4", - event_index=2008, - config=cfg, + S=my_binary, + norm_deriv=dSdt, + tau_hat=tau, + p=8, + ch="Data_ch4", + event_index=2008, + min_start=15, + width_metric="fwtm", + config=cfg, ) # Test that the estimated tau for a subset of results is close to the true value for the event - for i in range(39, 42): - np.testing.assert_almost_equal(tau[i], tau_val_true, decimal=6) + for i in range(2, 4): + np.testing.assert_allclose(tau[i], tau_val_true, atol=0.08e-05) + for i in range(10,15): + np.testing.assert_allclose(tau[i], tau_val_true, atol=0.05e-05) - tau_best = tau.isel(k=39).item() + k_min_local = int(d2.argmin(dim="k").item()) + # Get corresponding tau value + tau_best = tau.isel(k=k_min_local).item() + # Assert closeness np.testing.assert_allclose( tau_best, tau_val_true, - atol=0.02e-04, # absolute tolerance = 2e-6 (larger tolerance for evaporation events) - ) - \ No newline at end of file + atol=0.01e-05, # absolute tolerance = 2e-6 (larger tolerance for evaporation events) + ) \ No newline at end of file