Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions imap_processing/lo/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class LoConstants:
# The first matching open interval (low < pivot < high) is used; if none matches,
# THRESHOLD_BG_RATE_RAM_DEFAULT / THRESHOLD_BG_RATE_ANTI_RAM_DEFAULT apply.
PIVOT_ANGLE_THRESHOLDS: ClassVar[dict[tuple[float, float], tuple[float, float]]] = {
(88.0, 92.0): (0.014, 0.007),
(73.0, 77.0): (0.0175, 0.00875),
(103.0, 107.0): (0.0112, 0.0056),
(88.0, 92.0): (0.028, 0.014),
(73.0, 77.0): (0.035, 0.0175),
(103.0, 107.0): (0.0224, 0.0112),
}

# Default background-rate thresholds [counts/s] when no pivot range matches.
Expand All @@ -75,3 +75,18 @@ class LoConstants:
# Padding [s] added to begin/end of each goodtime interval to ensure complete
# cycles are covered at interval edges.
GOODTIME_PADDING: float = 2.0

# Star-sensor spin-angle binning offset (fractional bin-index shift used when
# computing sample centers), keyed by the IFB star-sync housekeeping state
# (ifb_ctrl_star_sync). Flight software 4.8 enabled star sync ("EN"),
# switching from binning to the bin center (+0.5) to the left edge (+0.0).
STAR_BIN_OFFSET_BY_SYNC: ClassVar[dict[str | None, float]] = {
"DS": 0.5, # star sync disabled (pre FSW 4.8)
"EN": 0.0, # star sync enabled (FSW 4.8+)
None: 0.5, # default value
}

# Number of ending bins to exclude from each star-sensor profile average.
STAR_END_BINS_TO_EXCLUDE: int = 2
# Minimum COUNT value for a star-sensor record to be considered valid.
STAR_MIN_COUNT_THRESHOLD: int = 700
90 changes: 73 additions & 17 deletions imap_processing/lo/l1b/lo_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,7 @@ def split_rate_dataset(

def filter_valid_star_records(
l1a_star: xr.Dataset,
min_count: int = 700,
min_count: int = c.STAR_MIN_COUNT_THRESHOLD,
time_window_offset: float = 0.0,
time_window_duration: float | None = None,
) -> np.ndarray:
Expand Down Expand Up @@ -2014,7 +2014,7 @@ def filter_valid_star_records(
valid_mask : np.ndarray
Boolean array indicating valid records.
"""
# Section 5: Acceptance Criteria - COUNT >= 700
# Section 5: Acceptance Criteria - COUNT >= min_count
count_mask = l1a_star["count"].values >= min_count

# shcoarse is already in MET seconds
Expand Down Expand Up @@ -2049,7 +2049,7 @@ def filter_valid_star_records(
def calculate_star_sensor_profile_for_group(
data: np.ndarray,
counts: np.ndarray,
end_bins_to_exclude: int = 2,
end_bins_to_exclude: int = c.STAR_END_BINS_TO_EXCLUDE,
) -> tuple[np.ndarray, np.ndarray]:
"""
Calculate averaged star sensor amplitude profile for a group of records.
Expand Down Expand Up @@ -2098,14 +2098,63 @@ def calculate_star_sensor_profile_for_group(
return avg_amplitude, count_array


def get_star_bin_offset(l1b_nhk: xr.Dataset, reference_epoch: int) -> float:
"""
Determine the star-sensor binning offset from the IFB star-sync state.

Reads ``ifb_ctrl_star_sync`` from the NHK housekeeping at the record nearest
on or before ``reference_epoch`` and maps it to a binning offset via
``LoConstants.STAR_BIN_OFFSET_BY_SYNC``.

Parameters
----------
l1b_nhk : xr.Dataset
L1B NHK dataset containing ``ifb_ctrl_star_sync`` and an ``epoch``
coordinate (TT2000 nanoseconds since J2000).
reference_epoch : int
Epoch at which to evaluate the sync state, in TT2000 nanoseconds since
J2000. The NHK record in effect at or before this time is used.

Returns
-------
bin_offset : float
Fractional bin-index offset to use when computing sample spin-angle
centers.

Raises
------
KeyError
If ``ifb_ctrl_star_sync`` is not present in ``l1b_nhk``.
"""
if "ifb_ctrl_star_sync" not in l1b_nhk:
raise KeyError(
"ifb_ctrl_star_sync field not found in L1B NHK dataset. "
"Cannot determine star-sensor binning offset."
)

nhk_epoch = l1b_nhk["epoch"].values
sync_state = l1b_nhk["ifb_ctrl_star_sync"].values

# Use the housekeeping record in effect at the reference epoch (the last
# NHK sample at or before it), clamping to the first sample if the reference
# epoch falls before NHK coverage.
idx = max(int(np.searchsorted(nhk_epoch, reference_epoch, side="right")) - 1, 0)
state = str(sync_state[idx])

offset = c.STAR_BIN_OFFSET_BY_SYNC.get(state, c.STAR_BIN_OFFSET_BY_SYNC[None])
logger.info(f"Star sync state '{state}' -> bin offset {offset}")
return offset


def calculate_star_sensor_profiles_by_group(
l1a_star: xr.Dataset,
sampling_cadence: float,
spin_period: float,
group_size: int = 64,
start_angle_offset: float = 62.0,
end_bins_to_exclude: int = 2,
min_count_threshold: int = 700,
end_bins_to_exclude: int = c.STAR_END_BINS_TO_EXCLUDE,
min_count_threshold: int = c.STAR_MIN_COUNT_THRESHOLD,
bin_offset: float = 0.5,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate averaged star sensor amplitude profiles for groups of records.
Expand All @@ -2129,6 +2178,10 @@ def calculate_star_sensor_profiles_by_group(
Number of ending bins to exclude from each average (default: 2).
min_count_threshold : int
Minimum COUNT value for valid record (default: 700).
bin_offset : float
Fractional offset applied to bin indices when computing sample
spin-angle centers (default: 0.5). Use 0.5 to bin to the bin center
(pre-FSW 4.8) and 0.0 to bin to the left edge (FSW 4.8 and later).

Returns
-------
Expand All @@ -2152,7 +2205,7 @@ def calculate_star_sensor_profiles_by_group(
# Calculate spin angles (same for all groups)
deg_per_bin = 360.0 * (sampling_cadence / 1000.0) / spin_period
bin_indices = np.arange(720)
sample_centers = (bin_indices + 0.5) * deg_per_bin
sample_centers = (bin_indices + bin_offset) * deg_per_bin
spin_angle = (start_angle_offset + sample_centers) % 360.0

if n_valid == 0:
Expand Down Expand Up @@ -2282,13 +2335,20 @@ def l1b_star(
logger.info(f"Using spin duration from spin data: {spin_duration:.6f} s")

# TODO: Read from ancillary config file when available
lo_angle_offset = 2.0
sc_to_inst_angle_offset = (
360 * get_spacecraft_to_instrument_spin_phase_offset(SpiceFrame.IMAP_LO)
+ lo_angle_offset
sc_to_inst_angle_offset = 360 * get_spacecraft_to_instrument_spin_phase_offset(
SpiceFrame.IMAP_LO
)
end_bins_to_exclude = 2
min_count_threshold = 700
end_bins_to_exclude = c.STAR_END_BINS_TO_EXCLUDE
min_count_threshold = c.STAR_MIN_COUNT_THRESHOLD

# Global epoch times from L1A data (used for start_doy/end_doy below).
global_start_epoch = l1a_star["epoch"].values[0]
global_end_epoch = l1a_star["epoch"].values[-1]

# Select the star-sensor binning convention from the IFB star-sync state in
# housekeeping. Evaluate at the earliest star record's epoch so a pointing that
# spans the `EN` event uses the value corresponding to the state at its start.
bin_offset = get_star_bin_offset(l1b_nhk, int(global_start_epoch))

# Calculate profiles for each 64-spin group
(
Expand All @@ -2304,12 +2364,9 @@ def l1b_star(
start_angle_offset=sc_to_inst_angle_offset,
end_bins_to_exclude=end_bins_to_exclude,
min_count_threshold=min_count_threshold,
bin_offset=bin_offset,
)

# Get global epoch times from L1A data for start_doy and end_doy
global_start_epoch = l1a_star["epoch"].values[0]
global_end_epoch = l1a_star["epoch"].values[-1]

# Create dataset with spin_angle as coordinate and multiple epochs
group_epochs = met_to_ttj2000ns(group_mets)
l1b_star_ds = xr.Dataset(
Expand Down Expand Up @@ -2375,7 +2432,6 @@ def l1b_star(
l1b_star_ds.attrs["pointing_mid_met"] = pointing_mid_met
l1b_star_ds.attrs["sampling_cadence_ms"] = sampling_cadence
l1b_star_ds.attrs["spin_duration_sec"] = spin_duration
l1b_star_ds.attrs["lo_angle_offset_deg"] = lo_angle_offset
l1b_star_ds.attrs["end_bins_excluded"] = end_bins_to_exclude
l1b_star_ds.attrs["min_count_threshold"] = min_count_threshold
l1b_star_ds.attrs["group_size"] = group_size
Expand Down
58 changes: 56 additions & 2 deletions imap_processing/tests/lo/test_lo_l1b.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_pivot_angle_from_nhk,
get_sampling_cadence_from_nhk,
get_spin_start_times,
get_star_bin_offset,
identify_species,
initialize_l1b_de,
l1b_bgrates_and_goodtimes,
Expand Down Expand Up @@ -1836,6 +1837,57 @@ def test_profiles_by_group_angle_wrapping(self, mock_repoint):
assert np.any(spin_angle < 100) # Some angles wrapped to lower range


class TestStarBinOffset:
"""Tests for the star-sensor binning offset derived from HK star-sync state."""

@staticmethod
def _nhk(states):
"""Build a minimal NHK dataset with ifb_ctrl_star_sync at 1 Hz."""
n = len(states)
return xr.Dataset(
{"ifb_ctrl_star_sync": ("epoch", list(states))},
coords={"epoch": met_to_ttj2000ns(np.arange(n, dtype=np.float64))},
)

@pytest.mark.parametrize("state, expected", [("DS", 0.5), ("EN", 0.0)])
def test_offset_maps_sync_state(self, state, expected):
"""DS (sync disabled) -> 0.5; EN (sync enabled) -> 0.0."""
nhk = self._nhk([state] * 3)
star_end = int(nhk["epoch"].values[1])
assert get_star_bin_offset(nhk, star_end) == expected

def test_uses_state_at_or_before_reference(self):
"""Offset reflects the NHK record in effect at the reference epoch."""
# Sync flips DS->EN at index 2.
nhk = self._nhk(["DS", "DS", "EN", "EN"])
before = int(nhk["epoch"].values[1])
at = int(nhk["epoch"].values[2])
assert get_star_bin_offset(nhk, before) == 0.5
assert get_star_bin_offset(nhk, at) == 0.0

def test_straddling_pointing_uses_start_state(self):
"""A pointing that begins before the enable event uses DS (bin center)."""
# Star data starts under DS but ends under EN: evaluating at the start
# (as l1b_star does) yields DS.
nhk = self._nhk(["DS", "DS", "EN", "EN"])
star_start = int(nhk["epoch"].values[0])
assert get_star_bin_offset(nhk, star_start) == 0.5

def test_missing_field_raises(self):
"""A clear error is raised if the star-sync field is absent."""
nhk = xr.Dataset(
{"ifb_data_interval": ("epoch", [21.0])},
coords={"epoch": met_to_ttj2000ns([0.0])},
)
with pytest.raises(KeyError, match="ifb_ctrl_star_sync"):
get_star_bin_offset(nhk, int(nhk["epoch"].values[0]))

def test_unexpected_state_defaults(self):
"""An unrecognized star-sync state falls back to the default offset (0.5)."""
nhk = self._nhk(["??"])
assert get_star_bin_offset(nhk, int(nhk["epoch"].values[0])) == 0.5


class TestL1bStar:
"""Tests for l1b_star function."""

Expand Down Expand Up @@ -1873,6 +1925,7 @@ def test_initializes_with_spin_data(
l1b_nhk = xr.Dataset(
{
"ifb_data_interval": ("epoch", [21.0] * n_records),
"ifb_ctrl_star_sync": ("epoch", ["DS"] * n_records),
},
coords={"epoch": list(range(n_records))},
)
Expand Down Expand Up @@ -1953,6 +2006,7 @@ def test_dataset_structure_and_attributes(
l1b_nhk = xr.Dataset(
{
"ifb_data_interval": ("epoch", [21.0]),
"ifb_ctrl_star_sync": ("epoch", ["DS"]),
},
coords={"epoch": [0]},
)
Expand Down Expand Up @@ -1998,10 +2052,8 @@ def test_dataset_structure_and_attributes(
assert l1b_star_ds["count_per_bin"].attrs["VALIDMAX"] == 100000

# Assert - Check processing parameter attributes
assert "lo_angle_offset_deg" in l1b_star_ds.attrs
assert "end_bins_excluded" in l1b_star_ds.attrs
assert "min_count_threshold" in l1b_star_ds.attrs
assert l1b_star_ds.attrs["lo_angle_offset_deg"] == 2.0
assert l1b_star_ds.attrs["end_bins_excluded"] == 2
assert l1b_star_ds.attrs["min_count_threshold"] == 700

Expand Down Expand Up @@ -2035,6 +2087,7 @@ def test_start_and_end_doy_variables(
l1b_nhk = xr.Dataset(
{
"ifb_data_interval": ("epoch", [21.0, 21.0, 21.0]),
"ifb_ctrl_star_sync": ("epoch", ["DS", "DS", "DS"]),
},
coords={"epoch": [0, 1, 2]},
)
Expand Down Expand Up @@ -2103,6 +2156,7 @@ def test_multiple_groups_created(
l1b_nhk = xr.Dataset(
{
"ifb_data_interval": ("epoch", [21.0] * n_records),
"ifb_ctrl_star_sync": ("epoch", ["DS"] * n_records),
},
coords={"epoch": list(range(n_records))},
)
Expand Down
Loading