diff --git a/imap_processing/lo/constants.py b/imap_processing/lo/constants.py index 25cea7b72..7220566c9 100644 --- a/imap_processing/lo/constants.py +++ b/imap_processing/lo/constants.py @@ -58,9 +58,9 @@ class LoConstants: # The first matching open interval (low < pivot < high) is used; if none matches, # THRESHOLD_BG_RATE_RAM_DEFAULT / THRESHOLD_BG_RATE_ANTI_RAM_DEFAULT apply. PIVOT_ANGLE_THRESHOLDS: ClassVar[dict[tuple[float, float], tuple[float, float]]] = { - (88.0, 92.0): (0.014, 0.007), - (73.0, 77.0): (0.0175, 0.00875), - (103.0, 107.0): (0.0112, 0.0056), + (88.0, 92.0): (0.028, 0.014), + (73.0, 77.0): (0.035, 0.0175), + (103.0, 107.0): (0.0224, 0.0112), } # Default background-rate thresholds [counts/s] when no pivot range matches. @@ -75,3 +75,18 @@ class LoConstants: # Padding [s] added to begin/end of each goodtime interval to ensure complete # cycles are covered at interval edges. GOODTIME_PADDING: float = 2.0 + + # Star-sensor spin-angle binning offset (fractional bin-index shift used when + # computing sample centers), keyed by the IFB star-sync housekeeping state + # (ifb_ctrl_star_sync). Flight software 4.8 enabled star sync ("EN"), + # switching from binning to the bin center (+0.5) to the left edge (+0.0). + STAR_BIN_OFFSET_BY_SYNC: ClassVar[dict[str | None, float]] = { + "DS": 0.5, # star sync disabled (pre FSW 4.8) + "EN": 0.0, # star sync enabled (FSW 4.8+) + None: 0.5, # default value + } + + # Number of ending bins to exclude from each star-sensor profile average. + STAR_END_BINS_TO_EXCLUDE: int = 2 + # Minimum COUNT value for a star-sensor record to be considered valid. + STAR_MIN_COUNT_THRESHOLD: int = 700 diff --git a/imap_processing/lo/l1b/lo_l1b.py b/imap_processing/lo/l1b/lo_l1b.py index a085a5b36..021ffb6a5 100644 --- a/imap_processing/lo/l1b/lo_l1b.py +++ b/imap_processing/lo/l1b/lo_l1b.py @@ -1986,7 +1986,7 @@ def split_rate_dataset( def filter_valid_star_records( l1a_star: xr.Dataset, - min_count: int = 700, + min_count: int = c.STAR_MIN_COUNT_THRESHOLD, time_window_offset: float = 0.0, time_window_duration: float | None = None, ) -> np.ndarray: @@ -2014,7 +2014,7 @@ def filter_valid_star_records( valid_mask : np.ndarray Boolean array indicating valid records. """ - # Section 5: Acceptance Criteria - COUNT >= 700 + # Section 5: Acceptance Criteria - COUNT >= min_count count_mask = l1a_star["count"].values >= min_count # shcoarse is already in MET seconds @@ -2049,7 +2049,7 @@ def filter_valid_star_records( def calculate_star_sensor_profile_for_group( data: np.ndarray, counts: np.ndarray, - end_bins_to_exclude: int = 2, + end_bins_to_exclude: int = c.STAR_END_BINS_TO_EXCLUDE, ) -> tuple[np.ndarray, np.ndarray]: """ Calculate averaged star sensor amplitude profile for a group of records. @@ -2098,14 +2098,63 @@ def calculate_star_sensor_profile_for_group( return avg_amplitude, count_array +def get_star_bin_offset(l1b_nhk: xr.Dataset, reference_epoch: int) -> float: + """ + Determine the star-sensor binning offset from the IFB star-sync state. + + Reads ``ifb_ctrl_star_sync`` from the NHK housekeeping at the record nearest + on or before ``reference_epoch`` and maps it to a binning offset via + ``LoConstants.STAR_BIN_OFFSET_BY_SYNC``. + + Parameters + ---------- + l1b_nhk : xr.Dataset + L1B NHK dataset containing ``ifb_ctrl_star_sync`` and an ``epoch`` + coordinate (TT2000 nanoseconds since J2000). + reference_epoch : int + Epoch at which to evaluate the sync state, in TT2000 nanoseconds since + J2000. The NHK record in effect at or before this time is used. + + Returns + ------- + bin_offset : float + Fractional bin-index offset to use when computing sample spin-angle + centers. + + Raises + ------ + KeyError + If ``ifb_ctrl_star_sync`` is not present in ``l1b_nhk``. + """ + if "ifb_ctrl_star_sync" not in l1b_nhk: + raise KeyError( + "ifb_ctrl_star_sync field not found in L1B NHK dataset. " + "Cannot determine star-sensor binning offset." + ) + + nhk_epoch = l1b_nhk["epoch"].values + sync_state = l1b_nhk["ifb_ctrl_star_sync"].values + + # Use the housekeeping record in effect at the reference epoch (the last + # NHK sample at or before it), clamping to the first sample if the reference + # epoch falls before NHK coverage. + idx = max(int(np.searchsorted(nhk_epoch, reference_epoch, side="right")) - 1, 0) + state = str(sync_state[idx]) + + offset = c.STAR_BIN_OFFSET_BY_SYNC.get(state, c.STAR_BIN_OFFSET_BY_SYNC[None]) + logger.info(f"Star sync state '{state}' -> bin offset {offset}") + return offset + + def calculate_star_sensor_profiles_by_group( l1a_star: xr.Dataset, sampling_cadence: float, spin_period: float, group_size: int = 64, start_angle_offset: float = 62.0, - end_bins_to_exclude: int = 2, - min_count_threshold: int = 700, + end_bins_to_exclude: int = c.STAR_END_BINS_TO_EXCLUDE, + min_count_threshold: int = c.STAR_MIN_COUNT_THRESHOLD, + bin_offset: float = 0.5, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Calculate averaged star sensor amplitude profiles for groups of records. @@ -2129,6 +2178,10 @@ def calculate_star_sensor_profiles_by_group( Number of ending bins to exclude from each average (default: 2). min_count_threshold : int Minimum COUNT value for valid record (default: 700). + bin_offset : float + Fractional offset applied to bin indices when computing sample + spin-angle centers (default: 0.5). Use 0.5 to bin to the bin center + (pre-FSW 4.8) and 0.0 to bin to the left edge (FSW 4.8 and later). Returns ------- @@ -2152,7 +2205,7 @@ def calculate_star_sensor_profiles_by_group( # Calculate spin angles (same for all groups) deg_per_bin = 360.0 * (sampling_cadence / 1000.0) / spin_period bin_indices = np.arange(720) - sample_centers = (bin_indices + 0.5) * deg_per_bin + sample_centers = (bin_indices + bin_offset) * deg_per_bin spin_angle = (start_angle_offset + sample_centers) % 360.0 if n_valid == 0: @@ -2282,13 +2335,20 @@ def l1b_star( logger.info(f"Using spin duration from spin data: {spin_duration:.6f} s") # TODO: Read from ancillary config file when available - lo_angle_offset = 2.0 - sc_to_inst_angle_offset = ( - 360 * get_spacecraft_to_instrument_spin_phase_offset(SpiceFrame.IMAP_LO) - + lo_angle_offset + sc_to_inst_angle_offset = 360 * get_spacecraft_to_instrument_spin_phase_offset( + SpiceFrame.IMAP_LO ) - end_bins_to_exclude = 2 - min_count_threshold = 700 + end_bins_to_exclude = c.STAR_END_BINS_TO_EXCLUDE + min_count_threshold = c.STAR_MIN_COUNT_THRESHOLD + + # Global epoch times from L1A data (used for start_doy/end_doy below). + global_start_epoch = l1a_star["epoch"].values[0] + global_end_epoch = l1a_star["epoch"].values[-1] + + # Select the star-sensor binning convention from the IFB star-sync state in + # housekeeping. Evaluate at the earliest star record's epoch so a pointing that + # spans the `EN` event uses the value corresponding to the state at its start. + bin_offset = get_star_bin_offset(l1b_nhk, int(global_start_epoch)) # Calculate profiles for each 64-spin group ( @@ -2304,12 +2364,9 @@ def l1b_star( start_angle_offset=sc_to_inst_angle_offset, end_bins_to_exclude=end_bins_to_exclude, min_count_threshold=min_count_threshold, + bin_offset=bin_offset, ) - # Get global epoch times from L1A data for start_doy and end_doy - global_start_epoch = l1a_star["epoch"].values[0] - global_end_epoch = l1a_star["epoch"].values[-1] - # Create dataset with spin_angle as coordinate and multiple epochs group_epochs = met_to_ttj2000ns(group_mets) l1b_star_ds = xr.Dataset( @@ -2375,7 +2432,6 @@ def l1b_star( l1b_star_ds.attrs["pointing_mid_met"] = pointing_mid_met l1b_star_ds.attrs["sampling_cadence_ms"] = sampling_cadence l1b_star_ds.attrs["spin_duration_sec"] = spin_duration - l1b_star_ds.attrs["lo_angle_offset_deg"] = lo_angle_offset l1b_star_ds.attrs["end_bins_excluded"] = end_bins_to_exclude l1b_star_ds.attrs["min_count_threshold"] = min_count_threshold l1b_star_ds.attrs["group_size"] = group_size diff --git a/imap_processing/tests/lo/test_lo_l1b.py b/imap_processing/tests/lo/test_lo_l1b.py index 17d0ae3db..2a8f23039 100644 --- a/imap_processing/tests/lo/test_lo_l1b.py +++ b/imap_processing/tests/lo/test_lo_l1b.py @@ -27,6 +27,7 @@ get_pivot_angle_from_nhk, get_sampling_cadence_from_nhk, get_spin_start_times, + get_star_bin_offset, identify_species, initialize_l1b_de, l1b_bgrates_and_goodtimes, @@ -1836,6 +1837,57 @@ def test_profiles_by_group_angle_wrapping(self, mock_repoint): assert np.any(spin_angle < 100) # Some angles wrapped to lower range +class TestStarBinOffset: + """Tests for the star-sensor binning offset derived from HK star-sync state.""" + + @staticmethod + def _nhk(states): + """Build a minimal NHK dataset with ifb_ctrl_star_sync at 1 Hz.""" + n = len(states) + return xr.Dataset( + {"ifb_ctrl_star_sync": ("epoch", list(states))}, + coords={"epoch": met_to_ttj2000ns(np.arange(n, dtype=np.float64))}, + ) + + @pytest.mark.parametrize("state, expected", [("DS", 0.5), ("EN", 0.0)]) + def test_offset_maps_sync_state(self, state, expected): + """DS (sync disabled) -> 0.5; EN (sync enabled) -> 0.0.""" + nhk = self._nhk([state] * 3) + star_end = int(nhk["epoch"].values[1]) + assert get_star_bin_offset(nhk, star_end) == expected + + def test_uses_state_at_or_before_reference(self): + """Offset reflects the NHK record in effect at the reference epoch.""" + # Sync flips DS->EN at index 2. + nhk = self._nhk(["DS", "DS", "EN", "EN"]) + before = int(nhk["epoch"].values[1]) + at = int(nhk["epoch"].values[2]) + assert get_star_bin_offset(nhk, before) == 0.5 + assert get_star_bin_offset(nhk, at) == 0.0 + + def test_straddling_pointing_uses_start_state(self): + """A pointing that begins before the enable event uses DS (bin center).""" + # Star data starts under DS but ends under EN: evaluating at the start + # (as l1b_star does) yields DS. + nhk = self._nhk(["DS", "DS", "EN", "EN"]) + star_start = int(nhk["epoch"].values[0]) + assert get_star_bin_offset(nhk, star_start) == 0.5 + + def test_missing_field_raises(self): + """A clear error is raised if the star-sync field is absent.""" + nhk = xr.Dataset( + {"ifb_data_interval": ("epoch", [21.0])}, + coords={"epoch": met_to_ttj2000ns([0.0])}, + ) + with pytest.raises(KeyError, match="ifb_ctrl_star_sync"): + get_star_bin_offset(nhk, int(nhk["epoch"].values[0])) + + def test_unexpected_state_defaults(self): + """An unrecognized star-sync state falls back to the default offset (0.5).""" + nhk = self._nhk(["??"]) + assert get_star_bin_offset(nhk, int(nhk["epoch"].values[0])) == 0.5 + + class TestL1bStar: """Tests for l1b_star function.""" @@ -1873,6 +1925,7 @@ def test_initializes_with_spin_data( l1b_nhk = xr.Dataset( { "ifb_data_interval": ("epoch", [21.0] * n_records), + "ifb_ctrl_star_sync": ("epoch", ["DS"] * n_records), }, coords={"epoch": list(range(n_records))}, ) @@ -1953,6 +2006,7 @@ def test_dataset_structure_and_attributes( l1b_nhk = xr.Dataset( { "ifb_data_interval": ("epoch", [21.0]), + "ifb_ctrl_star_sync": ("epoch", ["DS"]), }, coords={"epoch": [0]}, ) @@ -1998,10 +2052,8 @@ def test_dataset_structure_and_attributes( assert l1b_star_ds["count_per_bin"].attrs["VALIDMAX"] == 100000 # Assert - Check processing parameter attributes - assert "lo_angle_offset_deg" in l1b_star_ds.attrs assert "end_bins_excluded" in l1b_star_ds.attrs assert "min_count_threshold" in l1b_star_ds.attrs - assert l1b_star_ds.attrs["lo_angle_offset_deg"] == 2.0 assert l1b_star_ds.attrs["end_bins_excluded"] == 2 assert l1b_star_ds.attrs["min_count_threshold"] == 700 @@ -2035,6 +2087,7 @@ def test_start_and_end_doy_variables( l1b_nhk = xr.Dataset( { "ifb_data_interval": ("epoch", [21.0, 21.0, 21.0]), + "ifb_ctrl_star_sync": ("epoch", ["DS", "DS", "DS"]), }, coords={"epoch": [0, 1, 2]}, ) @@ -2103,6 +2156,7 @@ def test_multiple_groups_created( l1b_nhk = xr.Dataset( { "ifb_data_interval": ("epoch", [21.0] * n_records), + "ifb_ctrl_star_sync": ("epoch", ["DS"] * n_records), }, coords={"epoch": list(range(n_records))}, )