diff --git a/docs/source/advanced.rst b/docs/source/advanced.rst index 3cbee2b..5a54a01 100644 --- a/docs/source/advanced.rst +++ b/docs/source/advanced.rst @@ -22,28 +22,28 @@ the recording of the SigMF logo used in this example `from the specification signal = sigmffile.fromfile(path) # Get some metadata and all annotations - sample_rate = signal.get_global_field(SigMFFile.SAMPLE_RATE_KEY) + sample_rate = signal.get_global_field(sigmf.SAMPLE_RATE_KEY) sample_count = signal.sample_count signal_duration = sample_count / sample_rate annotations = signal.get_annotations() # Iterate over annotations for adx, annotation in enumerate(annotations): - annotation_start_idx = annotation[SigMFFile.START_INDEX_KEY] - annotation_length = annotation[SigMFFile.LENGTH_INDEX_KEY] + annotation_start_idx = annotation[sigmf.SAMPLE_START_KEY] + annotation_length = annotation[sigmf.SAMPLE_COUNT_KEY] annotation_comment = annotation.get( - SigMFFile.COMMENT_KEY, "[annotation {}]".format(adx) + sigmf.COMMENT_KEY, "[annotation {}]".format(adx) ) # Get capture info associated with the start of annotation capture = signal.get_capture_info(annotation_start_idx) - freq_center = capture.get(SigMFFile.FREQUENCY_KEY, 0) + freq_center = capture.get(sigmf.FREQUENCY_KEY, 0) freq_min = freq_center - 0.5 * sample_rate freq_max = freq_center + 0.5 * sample_rate # Get frequency edges of annotation (default to edges of capture) - freq_start = annotation.get(SigMFFile.FLO_KEY) - freq_stop = annotation.get(SigMFFile.FHI_KEY) + freq_start = annotation.get(sigmf.FREQ_LOWER_EDGE_KEY) + freq_stop = annotation.get(sigmf.FREQ_UPPER_EDGE_KEY) # Get the samples corresponding to annotation samples = signal.read_samples(annotation_start_idx, annotation_length) @@ -74,10 +74,10 @@ First, create a single SigMF Recording and save it to disk: meta = SigMFFile( data_file="example_cf32.sigmf-data", # extension is optional global_info={ - SigMFFile.DATATYPE_KEY: get_data_type_str(data), # in this case, 'cf32_le' - SigMFFile.SAMPLE_RATE_KEY: 48000, - SigMFFile.AUTHOR_KEY: "jane.doe@domain.org", - SigMFFile.DESCRIPTION_KEY: "All zero complex float32 example file.", + sigmf.DATATYPE_KEY: get_data_type_str(data), # in this case, 'cf32_le' + sigmf.SAMPLE_RATE_KEY: 48000, + sigmf.AUTHOR_KEY: "jane.doe@domain.org", + sigmf.DESCRIPTION_KEY: "All zero complex float32 example file.", }, ) @@ -85,8 +85,8 @@ First, create a single SigMF Recording and save it to disk: meta.add_capture( 0, metadata={ - SigMFFile.FREQUENCY_KEY: 915000000, - SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), + sigmf.FREQUENCY_KEY: 915000000, + sigmf.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), }, ) @@ -95,9 +95,9 @@ First, create a single SigMF Recording and save it to disk: 100, 200, metadata={ - SigMFFile.FLO_KEY: 914995000.0, - SigMFFile.FHI_KEY: 915005000.0, - SigMFFile.COMMENT_KEY: "example annotation", + sigmf.FREQ_LOWER_EDGE_KEY: 914995000.0, + sigmf.FREQ_UPPER_EDGE_KEY: 915005000.0, + sigmf.COMMENT_KEY: "example annotation", }, ) @@ -120,9 +120,9 @@ Now lets add another SigMF Recording and associate them with a SigMF Collection: meta_ci16 = SigMFFile( data_file="example_ci16.sigmf-data", # extension is optional global_info={ - SigMFFile.DATATYPE_KEY: "ci16_le", # get_data_type_str() is only valid for numpy types - SigMFFile.SAMPLE_RATE_KEY: 48000, - SigMFFile.DESCRIPTION_KEY: "All zero complex int16 file.", + sigmf.DATATYPE_KEY: "ci16_le", # get_data_type_str() is only valid for numpy types + sigmf.SAMPLE_RATE_KEY: 48000, + sigmf.DESCRIPTION_KEY: "All zero complex int16 file.", }, ) meta_ci16.add_capture(0, metadata=meta.get_capture_info(0)) @@ -155,59 +155,36 @@ The SigMF Collection and its associated Recordings can now be loaded like this: Load a SigMF Archive and slice without untaring ----------------------------------------------- -Since an *archive* is a tarball (uncompressed by default), and since there are many -excellent tools for manipulating tar files, it's fairly straightforward to -access the *data* part of a SigMF archive without un-taring it. This is a -compelling feature because **1** archives make it harder for the ``-data`` and -the ``-meta`` to get separated, and **2** some datasets are so large that it -can be impractical (due to available disk space, or slow network speeds if the -archive file resides on a network file share) or simply obnoxious to untar it -first. +Since an *archive* is a tarball (uncompressed by default), you can access the +*data* part of a SigMF archive without un-taring it. This is a compelling +feature because **1** archives make it harder for the ``-data`` and the +``-meta`` to get separated, and **2** some datasets are so large that it can be +impractical (due to available disk space, or slow network speeds if the archive +file resides on a network file share) or simply obnoxious to untar it first. :: >>> import sigmf - >>> arc = sigmf.SigMFArchiveReader('/src/LTE.sigmf') - >>> arc.shape + >>> signal = sigmf.fromarchive('/src/LTE.sigmf') + >>> signal.shape (15379532,) - >>> arc.ndim + >>> signal.ndim 1 - >>> arc[:10] - array([-20.+11.j, -21. -6.j, -17.-20.j, -13.-52.j, 0.-75.j, 22.-58.j, - 48.-44.j, 49.-60.j, 31.-56.j, 23.-47.j], dtype=complex64) + >>> signal[:10] + array([-0.023+0.012j, -0.021-0.006j, -0.017-0.020j, -0.013-0.052j, + 0.000-0.075j, 0.022-0.058j, 0.048-0.044j, 0.049-0.060j, + 0.031-0.056j, 0.023-0.047j], dtype=complex64) -The preceeding example exhibits another feature of this approach; the archive -``LTE.sigmf`` is actually ``complex-int16``'s on disk, for which there is no -corresponding type in ``numpy``. However, the ``.sigmffile`` member keeps track of -this, and converts the data to ``numpy.complex64`` *after* slicing it, that is, -after reading it from disk. +Archives can contain fixed-point data types like ``complex-int16`` (``ci16``), +which have no direct ``numpy`` equivalent. By default, this data is automatically +scaled to floating-point values in the range ``[-1.0, 1.0]`` and returned as +``numpy.complex64``: :: - >>> arc.sigmffile.get_global_field(sigmf.SigMFFile.DATATYPE_KEY) + >>> signal.get_global_field(sigmf.DATATYPE_KEY) 'ci16_le' - >>> arc.sigmffile._memmap.dtype - dtype('int16') - - >>> arc.sigmffile._return_type - '>> import sigmf, io - >>> sigmf_bytes = io.BytesIO(open('/src/LTE.sigmf', 'rb').read()) - >>> arc = sigmf.SigMFArchiveReader(archive_buffer=sigmf_bytes) - >>> arc[:10] - array([-20.+11.j, -21. -6.j, -17.-20.j, -13.-52.j, 0.-75.j, 22.-58.j, - 48.-44.j, 49.-60.j, 31.-56.j, 23.-47.j], dtype=complex64) - ------------------------------ Compressed SigMF Archives ------------------------------ @@ -248,7 +225,7 @@ The file extension determines the archive format: >>> signal = sigmf.fromfile('recording.sigmf.xz') >>> signal[:10] - array([-20.+11.j, ...], dtype=complex64) + array([-0.023+0.012j, -0.021-0.006j, ...], dtype=complex64) **Memory behavior:** diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 6d9a292..76662d8 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -91,10 +91,10 @@ For full control over global fields, captures, and annotations: meta = SigMFFile( data_file="example.sigmf-data", # extension is optional global_info={ - SigMFFile.DATATYPE_KEY: get_data_type_str(data), # in this case, "cf32_le" - SigMFFile.SAMPLE_RATE_KEY: 48000, - SigMFFile.AUTHOR_KEY: "jane.doe@domain.org", - SigMFFile.DESCRIPTION_KEY: "All zero complex float32 example file.", + sigmf.DATATYPE_KEY: get_data_type_str(data), # in this case, "cf32_le" + sigmf.SAMPLE_RATE_KEY: 48000, + sigmf.AUTHOR_KEY: "jane.doe@domain.org", + sigmf.DESCRIPTION_KEY: "All zero complex float32 example file.", }, ) @@ -102,8 +102,8 @@ For full control over global fields, captures, and annotations: meta.add_capture( 0, metadata={ - SigMFFile.FREQUENCY_KEY: 915000000, - SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), + sigmf.FREQUENCY_KEY: 915000000, + sigmf.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), }, ) @@ -112,9 +112,9 @@ For full control over global fields, captures, and annotations: 100, 200, metadata={ - SigMFFile.FLO_KEY: 914995000.0, - SigMFFile.FHI_KEY: 915005000.0, - SigMFFile.COMMENT_KEY: "example annotation", + sigmf.FREQ_LOWER_EDGE_KEY: 914995000.0, + sigmf.FREQ_UPPER_EDGE_KEY: 915005000.0, + sigmf.COMMENT_KEY: "example annotation", }, ) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index fca8ff4..d313b68 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.10.0" +__version__ = "1.11.0" # matching version of the SigMF specification __specification__ = "1.2.6" @@ -13,6 +13,7 @@ archive, archivereader, error, + keys, schema, siggen, sigmffile, @@ -21,5 +22,6 @@ ) from .archive import SigMFArchive from .archivereader import SigMFArchiveReader +from .keys import * # noqa: F401, F403 from .siggen import SigMFGenerator from .sigmffile import SigMFCollection, SigMFFile, fromarchive, fromarray, fromfile diff --git a/sigmf/archive.py b/sigmf/archive.py index 0d4a22e..4a8ebb3 100644 --- a/sigmf/archive.py +++ b/sigmf/archive.py @@ -14,21 +14,14 @@ from pathlib import Path from .error import SigMFFileError, SigMFFileExistsError - -SIGMF_ARCHIVE_EXT = ".sigmf" -SIGMF_METADATA_EXT = ".sigmf-meta" -SIGMF_DATASET_EXT = ".sigmf-data" -SIGMF_COLLECTION_EXT = ".sigmf-collection" - -SIGMF_COMPRESSED_EXTS = { - # compression type -> unique compound extension - "gz": ".sigmf.gz", - "xz": ".sigmf.xz", - "zip": ".sigmf.zip", -} - -# all recognized archive extensions (uncompressed + compressed) -SIGMF_ARCHIVE_EXTS = {SIGMF_ARCHIVE_EXT} | set(SIGMF_COMPRESSED_EXTS.values()) +from .keys import ( + SIGMF_ARCHIVE_EXT, + SIGMF_ARCHIVE_EXTS, + SIGMF_COLLECTION_EXT, + SIGMF_COMPRESSED_EXTS, + SIGMF_DATASET_EXT, + SIGMF_METADATA_EXT, +) def _detect_compression(path): @@ -83,6 +76,38 @@ def _get_archive_basename(path): return path.stem +def _get_archive_basename(path): + """Get the archive base name (without any sigmf archive extension). + + Parameters + ---------- + path : Path + Archive file path. + + Returns + ------- + str + Base name without sigmf extension. + + Examples + -------- + >>> _get_archive_basename(Path("recording.sigmf")) + 'recording' + >>> _get_archive_basename(Path("recording.sigmf.gz")) + 'recording' + >>> _get_archive_basename(Path("my.recording.sigmf.zip")) + 'my.recording' + """ + name = path.name + # check compound extensions first (longest match) + for ext in sorted(SIGMF_COMPRESSED_EXTS.values(), key=len, reverse=True): + if name.endswith(ext): + return name[: -len(ext)] + if name.endswith(SIGMF_ARCHIVE_EXT): + return name[: -len(SIGMF_ARCHIVE_EXT)] + return path.stem + + class SigMFArchive: """ Archive a SigMFFile into a tar or zip file, optionally with compression. @@ -152,7 +177,6 @@ def __init__(self, sigmffile, name=None, fileobj=None, compression=None, overwri with open(meta_path, "w") as handle: self.sigmffile.dump(handle) if isinstance(self.sigmffile.data_buffer, io.BytesIO): - self.sigmffile.data_file = data_path with open(data_path, "wb") as handle: handle.write(self.sigmffile.data_buffer.getbuffer()) else: diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 5f13c63..1133716 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -11,16 +11,16 @@ import zipfile from pathlib import Path -from . import __version__ -from .archive import ( +from . import __version__, keys +from .archive import _detect_compression +from .error import SigMFFileError +from .hashing import calculate_sha512 +from .keys import ( SIGMF_ARCHIVE_EXT, SIGMF_ARCHIVE_EXTS, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, - _detect_compression, ) -from .error import SigMFFileError -from .hashing import calculate_sha512 from .sigmffile import SigMFFile @@ -116,6 +116,8 @@ def _read_tar_obj(self, tar_obj): with tar_obj.extractfile(memb) as fid: data_buffer = io.BytesIO(fid.read()) + if json_contents is None: + raise SigMFFileError("No .sigmf-meta file found in archive!") if data_buffer is None: raise SigMFFileError("No .sigmf-data file found in archive!") return json_contents, data_buffer, data_size_bytes @@ -151,6 +153,8 @@ def _read_zip_obj(self, zf): data_size_bytes = len(raw) data_buffer = io.BytesIO(raw) + if json_contents is None: + raise SigMFFileError("No .sigmf-meta file found in archive!") if data_buffer is None: raise SigMFFileError("No .sigmf-data file found in archive!") return json_contents, data_buffer, data_size_bytes @@ -188,6 +192,8 @@ def _init_from_tar_memmap(self, path, skip_checksum, map_readonly, autoscale): tar_obj.close() + if json_contents is None: + raise SigMFFileError("No .sigmf-meta file found in archive!") if data_offset is None: raise SigMFFileError("No .sigmf-data file found in archive!") @@ -197,10 +203,10 @@ def _init_from_tar_memmap(self, path, skip_checksum, map_readonly, autoscale): # compute hash of data portion only (not full tar file) if not skip_checksum: data_hash = calculate_sha512(filename=path, offset=data_offset, size=data_size_bytes) - old_hash = self.sigmffile.get_global_field(SigMFFile.HASH_KEY) + old_hash = self.sigmffile.get_global_field(keys.SHA512_KEY) if old_hash is not None and old_hash != data_hash: raise SigMFFileError("Calculated file hash does not match associated metadata.") - self.sigmffile.set_global_field(SigMFFile.HASH_KEY, data_hash) + self.sigmffile.set_global_field(keys.SHA512_KEY, data_hash) # memmap directly into the tar file at the data offset self.sigmffile.set_data_file( @@ -212,8 +218,8 @@ def _init_from_tar_memmap(self, path, skip_checksum, map_readonly, autoscale): ) # set_data_file sets DATASET_KEY for non-.sigmf-data files (NCD), # but the tar archive path is not a dataset — clear it - if SigMFFile.DATASET_KEY in self.sigmffile.get_global_info(): - del self.sigmffile._metadata[SigMFFile.GLOBAL_KEY][SigMFFile.DATASET_KEY] + if keys.DATASET_KEY in self.sigmffile.get_global_info(): + del self.sigmffile._metadata[SigMFFile.GLOBAL_KEY][keys.DATASET_KEY] self.ndim = self.sigmffile.ndim self.shape = self.sigmffile.shape diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index c92650a..8bd0385 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -25,6 +25,7 @@ import numpy as np from packaging.version import InvalidVersion, Version +from .. import keys from ..error import SigMFConversionError from ..sigmffile import SigMFFile, fromfile, get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT @@ -521,19 +522,19 @@ def get_tag(tag): # base global metadata global_info = { - SigMFFile.AUTHOR_KEY: getpass.getuser(), - SigMFFile.DATATYPE_KEY: datatype, - SigMFFile.RECORDER_KEY: "Official SigMF BLUE converter", - SigMFFile.NUM_CHANNELS_KEY: num_channels, - SigMFFile.SAMPLE_RATE_KEY: sample_rate_hz, - SigMFFile.EXTENSIONS_KEY: [{"name": "blue", "version": "0.0.1", "optional": True}], - SigMFFile.DESCRIPTION_KEY: _description(h_fixed), + keys.AUTHOR_KEY: getpass.getuser(), + keys.DATATYPE_KEY: datatype, + keys.RECORDER_KEY: "Official SigMF BLUE converter", + keys.NUM_CHANNELS_KEY: num_channels, + keys.SAMPLE_RATE_KEY: sample_rate_hz, + keys.EXTENSIONS_KEY: [{"name": "blue", "version": "0.0.1", "optional": True}], + keys.DESCRIPTION_KEY: _description(h_fixed), } # add NCD-specific fields if is_ncd: - global_info[SigMFFile.DATASET_KEY] = blue_file_name - global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes + global_info[keys.DATASET_KEY] = blue_file_name + global_info[keys.TRAILING_BYTES_KEY] = trailing_bytes # merge HCB values into metadata global_info["blue:fixed"] = h_fixed @@ -571,11 +572,11 @@ def get_tag(tag): # timecode uses 1950-01-01 as epoch, datetime uses 1970-01-01 blue_epoch = blue_start_time - 631152000 # seconds between 1950 and 1970 blue_datetime = datetime.fromtimestamp(blue_epoch, tz=timezone.utc) - capture_info[SigMFFile.DATETIME_KEY] = blue_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT) + capture_info[keys.DATETIME_KEY] = blue_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT) if get_tag("RF_FREQ") is not None: # it's possible other keys indicate tune frequency, but RF_FREQ is common - capture_info[SigMFFile.FREQUENCY_KEY] = float(get_tag("RF_FREQ")) + capture_info[keys.FREQUENCY_KEY] = float(get_tag("RF_FREQ")) return global_info, capture_info @@ -703,12 +704,12 @@ def construct_sigmf( # set metadata-only flag for zero-sample files (only for non-NCD files) if is_metadata_only: # ensure we're not accidentally setting metadata_only for an NCD - if SigMFFile.DATASET_KEY in global_info: + if keys.DATASET_KEY in global_info: raise ValueError( "Cannot set metadata_only=True for Non-Conforming Dataset files. " "Per SigMF spec, metadata_only MAY NOT be used with core:dataset field." ) - global_info[SigMFFile.METADATA_ONLY_KEY] = True + global_info[keys.METADATA_ONLY_KEY] = True # for metadata-only files, don't specify data_file and skip checksum if is_metadata_only: @@ -776,7 +777,7 @@ def construct_sigmf_ncd( ) # add NCD-specific capture info - capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + capture_info[keys.HEADER_BYTES_KEY] = header_bytes # create NCD metadata-only SigMF pointing to original file meta = SigMFFile(global_info=global_info, skip_checksum=True) diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 95ffb0c..e0b2044 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -18,7 +18,7 @@ import defusedxml.ElementTree as ET import numpy as np -from .. import SigMFFile, fromfile +from .. import SigMFFile, fromfile, keys from ..error import SigMFConversionError from ..sigmffile import get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT @@ -240,13 +240,13 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: # base global metadata global_md = { - SigMFFile.AUTHOR_KEY: getpass.getuser(), - SigMFFile.DATATYPE_KEY: data_type, - SigMFFile.HW_KEY: hardware_description, - SigMFFile.NUM_CHANNELS_KEY: 1, - SigMFFile.RECORDER_KEY: "Official SigMF Signal Hound converter", - SigMFFile.SAMPLE_RATE_KEY: sample_rate, - SigMFFile.EXTENSIONS_KEY: [{"name": "spike", "version": "0.0.1", "optional": True}], + keys.AUTHOR_KEY: getpass.getuser(), + keys.DATATYPE_KEY: data_type, + keys.HW_KEY: hardware_description, + keys.NUM_CHANNELS_KEY: 1, + keys.RECORDER_KEY: "Official SigMF Signal Hound converter", + keys.SAMPLE_RATE_KEY: sample_rate, + keys.EXTENSIONS_KEY: [{"name": "spike", "version": "0.0.1", "optional": True}], } # add optional spike-specific fields to global metadata using spike: namespace @@ -264,10 +264,10 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: # capture info capture_info = { - SigMFFile.FREQUENCY_KEY: center_frequency, + keys.FREQUENCY_KEY: center_frequency, } if iso_8601_string: - capture_info[SigMFFile.DATETIME_KEY] = iso_8601_string + capture_info[keys.DATETIME_KEY] = iso_8601_string # create annotations array using calculated values annotations = [] @@ -276,11 +276,11 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: lower_frequency_edge = center_frequency - (if_bandwidth / 2.0) annotations.append( { - SigMFFile.START_INDEX_KEY: 0, - SigMFFile.LENGTH_INDEX_KEY: sample_count_calculated, - SigMFFile.FLO_KEY: lower_frequency_edge, - SigMFFile.FHI_KEY: upper_frequency_edge, - SigMFFile.LABEL_KEY: "Spike", + keys.SAMPLE_START_KEY: 0, + keys.SAMPLE_COUNT_KEY: sample_count_calculated, + keys.FREQ_LOWER_EDGE_KEY: lower_frequency_edge, + keys.FREQ_UPPER_EDGE_KEY: upper_frequency_edge, + keys.LABEL_KEY: "Spike", } ) @@ -326,10 +326,10 @@ def convert_iq_data(xml_path: Path, sample_count: int) -> np.ndarray: def _add_annotations(meta: SigMFFile, annotations: list) -> None: for annotation in annotations: - start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) - length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + start_idx = annotation.get(keys.SAMPLE_START_KEY, 0) + length = annotation.get(keys.SAMPLE_COUNT_KEY) annot_metadata = { - k: v for k, v in annotation.items() if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + k: v for k, v in annotation.items() if k not in [keys.SAMPLE_START_KEY, keys.SAMPLE_COUNT_KEY] } meta.add_annotation(start_idx, length=length, metadata=annot_metadata) @@ -388,9 +388,9 @@ def signalhound_to_sigmf( # create NCD if specified, otherwise create standard SigMF dataset or archive if create_ncd: # spike files have no header or trailing bytes - global_info[SigMFFile.DATASET_KEY] = signalhound_path.with_suffix(".iq").name - global_info[SigMFFile.TRAILING_BYTES_KEY] = 0 - capture_info[SigMFFile.HEADER_BYTES_KEY] = 0 + global_info[keys.DATASET_KEY] = signalhound_path.with_suffix(".iq").name + global_info[keys.TRAILING_BYTES_KEY] = 0 + capture_info[keys.HEADER_BYTES_KEY] = 0 # build the .iq file path for data file base_file_name = signalhound_path.with_suffix("") diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py index 2b715e1..8e1dec9 100644 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -18,7 +18,7 @@ from .. import SigMFFile from .. import __version__ as toolversion -from .. import fromfile +from .. import fromfile, keys from ..error import SigMFFileExistsError from ..sigmffile import get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT, get_data_type_str @@ -145,25 +145,25 @@ def wav_to_sigmf( datatype_str = get_data_type_str(wav_data) global_info = { - SigMFFile.DATATYPE_KEY: datatype_str, - SigMFFile.DESCRIPTION_KEY: f"converted from {wav_path.name}", - SigMFFile.NUM_CHANNELS_KEY: n_channels, - SigMFFile.RECORDER_KEY: "Official SigMF WAV converter", - SigMFFile.SAMPLE_RATE_KEY: samp_rate, + keys.DATATYPE_KEY: datatype_str, + keys.DESCRIPTION_KEY: f"converted from {wav_path.name}", + keys.NUM_CHANNELS_KEY: n_channels, + keys.RECORDER_KEY: "Official SigMF WAV converter", + keys.SAMPLE_RATE_KEY: samp_rate, } modify_time = wav_path.lstat().st_mtime wav_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) capture_info = { - SigMFFile.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), + keys.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), } if create_ncd: # NCD requires extra fields - global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes - global_info[SigMFFile.DATASET_KEY] = wav_path.name - capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + global_info[keys.TRAILING_BYTES_KEY] = trailing_bytes + global_info[keys.DATASET_KEY] = wav_path.name + capture_info[keys.HEADER_BYTES_KEY] = header_bytes # create metadata-only SigMF for NCD pointing to original file meta = SigMFFile(global_info=global_info) diff --git a/sigmf/keys.py b/sigmf/keys.py new file mode 100644 index 0000000..6c3dd3c --- /dev/null +++ b/sigmf/keys.py @@ -0,0 +1,222 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""SigMF metadata key and file extension constants.""" + +# --------------------------------------------------------------------------- +# capture / annotation sample indexing keys +# --------------------------------------------------------------------------- +SAMPLE_START_KEY = "core:sample_start" +SAMPLE_COUNT_KEY = "core:sample_count" +GLOBAL_INDEX_KEY = "core:global_index" +OFFSET_KEY = "core:offset" + +# --------------------------------------------------------------------------- +# data specification keys +# --------------------------------------------------------------------------- +NUM_CHANNELS_KEY = "core:num_channels" +SHA512_KEY = "core:sha512" +VERSION_KEY = "core:version" +DATATYPE_KEY = "core:datatype" +FREQUENCY_KEY = "core:frequency" +HEADER_BYTES_KEY = "core:header_bytes" +FREQ_LOWER_EDGE_KEY = "core:freq_lower_edge" +FREQ_UPPER_EDGE_KEY = "core:freq_upper_edge" +SAMPLE_RATE_KEY = "core:sample_rate" +TRAILING_BYTES_KEY = "core:trailing_bytes" + +# --------------------------------------------------------------------------- +# metadata / descriptive keys +# --------------------------------------------------------------------------- +COMMENT_KEY = "core:comment" +DESCRIPTION_KEY = "core:description" +AUTHOR_KEY = "core:author" +META_DOI_KEY = "core:meta_doi" +DATA_DOI_KEY = "core:data_doi" +GENERATOR_KEY = "core:generator" +LABEL_KEY = "core:label" +RECORDER_KEY = "core:recorder" +LICENSE_KEY = "core:license" +HW_KEY = "core:hw" +DATASET_KEY = "core:dataset" +METADATA_ONLY_KEY = "core:metadata_only" +EXTENSIONS_KEY = "core:extensions" +DATETIME_KEY = "core:datetime" +UUID_KEY = "core:uuid" + +# --------------------------------------------------------------------------- +# location keys +# --------------------------------------------------------------------------- +LATITUDE_KEY = "core:latitude" +LONGITUDE_KEY = "core:longitude" +GEOLOCATION_KEY = "core:geolocation" + +# --------------------------------------------------------------------------- +# cross-reference keys +# --------------------------------------------------------------------------- +COLLECTION_KEY = "core:collection" + +# --------------------------------------------------------------------------- +# collection-specific field keys +# --------------------------------------------------------------------------- +COLLECTION_DOI_KEY = "core:collection_doi" +STREAMS_KEY = "core:streams" + +# --------------------------------------------------------------------------- +# valid key lists per section +# --------------------------------------------------------------------------- +VALID_GLOBAL_KEYS = [ + AUTHOR_KEY, + COLLECTION_KEY, + DATASET_KEY, + DATATYPE_KEY, + DATA_DOI_KEY, + DESCRIPTION_KEY, + EXTENSIONS_KEY, + GEOLOCATION_KEY, + SHA512_KEY, + HW_KEY, + LICENSE_KEY, + META_DOI_KEY, + METADATA_ONLY_KEY, + NUM_CHANNELS_KEY, + RECORDER_KEY, + SAMPLE_RATE_KEY, + OFFSET_KEY, + TRAILING_BYTES_KEY, + VERSION_KEY, +] + +VALID_CAPTURE_KEYS = [ + DATETIME_KEY, + FREQUENCY_KEY, + GLOBAL_INDEX_KEY, + HEADER_BYTES_KEY, + SAMPLE_START_KEY, +] + +VALID_ANNOTATION_KEYS = [ + COMMENT_KEY, + FREQ_UPPER_EDGE_KEY, + FREQ_LOWER_EDGE_KEY, + GENERATOR_KEY, + LABEL_KEY, + LATITUDE_KEY, + LONGITUDE_KEY, + SAMPLE_COUNT_KEY, + SAMPLE_START_KEY, + UUID_KEY, +] + +VALID_COLLECTION_KEYS = [ + AUTHOR_KEY, + COLLECTION_DOI_KEY, + DESCRIPTION_KEY, + EXTENSIONS_KEY, + LICENSE_KEY, + STREAMS_KEY, + VERSION_KEY, +] + +# --------------------------------------------------------------------------- +# file extension constants +# --------------------------------------------------------------------------- +SIGMF_ARCHIVE_EXT = ".sigmf" +SIGMF_METADATA_EXT = ".sigmf-meta" +SIGMF_DATASET_EXT = ".sigmf-data" +SIGMF_COLLECTION_EXT = ".sigmf-collection" + +SIGMF_COMPRESSED_EXTS = { + "gz": ".sigmf.gz", + "xz": ".sigmf.xz", + "zip": ".sigmf.zip", +} + +# all recognized archive extensions (uncompressed + compressed) +SIGMF_ARCHIVE_EXTS = {SIGMF_ARCHIVE_EXT} | set(SIGMF_COMPRESSED_EXTS.values()) + +# all SigMF file suffixes +SIGMF_SUFFIXES = [ + SIGMF_DATASET_EXT, + SIGMF_METADATA_EXT, + SIGMF_ARCHIVE_EXT, + SIGMF_COLLECTION_EXT, +] + +# --------------------------------------------------------------------------- +# deprecated alias map — used by _SigMFDeprecatingMeta in sigmffile.py +# maps old_name -> (new_name, value) +# --------------------------------------------------------------------------- +_DEPRECATED_ALIASES = { + "START_INDEX_KEY": ("SAMPLE_START_KEY", SAMPLE_START_KEY), + "LENGTH_INDEX_KEY": ("SAMPLE_COUNT_KEY", SAMPLE_COUNT_KEY), + "START_OFFSET_KEY": ("OFFSET_KEY", OFFSET_KEY), + "HASH_KEY": ("SHA512_KEY", SHA512_KEY), + "FLO_KEY": ("FREQ_LOWER_EDGE_KEY", FREQ_LOWER_EDGE_KEY), + "FHI_KEY": ("FREQ_UPPER_EDGE_KEY", FREQ_UPPER_EDGE_KEY), + "LAT_KEY": ("LATITUDE_KEY", LATITUDE_KEY), + "LON_KEY": ("LONGITUDE_KEY", LONGITUDE_KEY), +} + +# --------------------------------------------------------------------------- +# public exports +# --------------------------------------------------------------------------- +__all__ = [ + # sample indexing + "SAMPLE_START_KEY", + "SAMPLE_COUNT_KEY", + "GLOBAL_INDEX_KEY", + "OFFSET_KEY", + # data specification + "NUM_CHANNELS_KEY", + "SHA512_KEY", + "VERSION_KEY", + "DATATYPE_KEY", + "FREQUENCY_KEY", + "HEADER_BYTES_KEY", + "FREQ_LOWER_EDGE_KEY", + "FREQ_UPPER_EDGE_KEY", + "SAMPLE_RATE_KEY", + "TRAILING_BYTES_KEY", + # metadata / descriptive + "COMMENT_KEY", + "DESCRIPTION_KEY", + "AUTHOR_KEY", + "META_DOI_KEY", + "DATA_DOI_KEY", + "GENERATOR_KEY", + "LABEL_KEY", + "RECORDER_KEY", + "LICENSE_KEY", + "HW_KEY", + "DATASET_KEY", + "METADATA_ONLY_KEY", + "EXTENSIONS_KEY", + "DATETIME_KEY", + "UUID_KEY", + # location + "LATITUDE_KEY", + "LONGITUDE_KEY", + "GEOLOCATION_KEY", + # cross-reference + "COLLECTION_KEY", + # collection-specific + "COLLECTION_DOI_KEY", + "STREAMS_KEY", + # valid key lists + "VALID_GLOBAL_KEYS", + "VALID_CAPTURE_KEYS", + "VALID_ANNOTATION_KEYS", + "VALID_COLLECTION_KEYS", + # file extensions + "SIGMF_ARCHIVE_EXT", + "SIGMF_METADATA_EXT", + "SIGMF_DATASET_EXT", + "SIGMF_COLLECTION_EXT", + "SIGMF_COMPRESSED_EXTS", + "SIGMF_ARCHIVE_EXTS", + "SIGMF_SUFFIXES", +] diff --git a/sigmf/siggen.py b/sigmf/siggen.py index 5397c55..453d3c8 100644 --- a/sigmf/siggen.py +++ b/sigmf/siggen.py @@ -11,6 +11,7 @@ import numpy as np +from . import keys from .error import SigMFGeneratorError from .sigmffile import SigMFFile from .utils import get_data_type_str, get_sigmf_iso8601_datetime_now @@ -492,9 +493,9 @@ def _build_annotations(self, samples: np.ndarray) -> list: # base annotation common to all components base_annotation = { - SigMFFile.START_INDEX_KEY: start_sample, - SigMFFile.LENGTH_INDEX_KEY: end_sample - start_sample, - SigMFFile.GENERATOR_KEY: generator_name, + keys.SAMPLE_START_KEY: start_sample, + keys.SAMPLE_COUNT_KEY: end_sample - start_sample, + keys.GENERATOR_KEY: generator_name, } if component["type"] == "tone": @@ -504,9 +505,9 @@ def _build_annotations(self, samples: np.ndarray) -> list: base_annotation.update( { - SigMFFile.FLO_KEY: total_freq - bandwidth / 2, - SigMFFile.FHI_KEY: total_freq + bandwidth / 2, - SigMFFile.LABEL_KEY: f"tone at {base_freq:.0f} Hz", + keys.FREQ_LOWER_EDGE_KEY: total_freq - bandwidth / 2, + keys.FREQ_UPPER_EDGE_KEY: total_freq + bandwidth / 2, + keys.LABEL_KEY: f"tone at {base_freq:.0f} Hz", } ) @@ -516,9 +517,9 @@ def _build_annotations(self, samples: np.ndarray) -> list: base_annotation.update( { - SigMFFile.FLO_KEY: min(start_freq, end_freq), - SigMFFile.FHI_KEY: max(start_freq, end_freq), - SigMFFile.LABEL_KEY: f"sweep from {component['start_frequency_hz']:.0f} to {component['end_frequency_hz']:.0f} Hz", + keys.FREQ_LOWER_EDGE_KEY: min(start_freq, end_freq), + keys.FREQ_UPPER_EDGE_KEY: max(start_freq, end_freq), + keys.LABEL_KEY: f"sweep from {component['start_frequency_hz']:.0f} to {component['end_frequency_hz']:.0f} Hz", } ) @@ -526,15 +527,15 @@ def _build_annotations(self, samples: np.ndarray) -> list: # add user comment to first component if provided if self._comment is not None and len(annotations) > 0: - annotations[0][SigMFFile.COMMENT_KEY] = self._comment + annotations[0][keys.COMMENT_KEY] = self._comment # helper function to create full-signal annotations def create_full_signal_annotation(label: str) -> dict: return { - SigMFFile.START_INDEX_KEY: 0, - SigMFFile.LENGTH_INDEX_KEY: len(samples), - SigMFFile.GENERATOR_KEY: generator_name, - SigMFFile.LABEL_KEY: label, + keys.SAMPLE_START_KEY: 0, + keys.SAMPLE_COUNT_KEY: len(samples), + keys.GENERATOR_KEY: generator_name, + keys.LABEL_KEY: label, } # noise annotation if snr was applied @@ -542,8 +543,8 @@ def create_full_signal_annotation(label: str) -> dict: noise_annotation = create_full_signal_annotation(f"AWGN {self._snr_db:.1f} dB SNR") noise_annotation.update( { - SigMFFile.FLO_KEY: 0.0, - SigMFFile.FHI_KEY: self._sample_rate_hz / 2, # full nyquist bandwidth + keys.FREQ_LOWER_EDGE_KEY: 0.0, + keys.FREQ_UPPER_EDGE_KEY: self._sample_rate_hz / 2, # full nyquist bandwidth } ) annotations.append(noise_annotation) @@ -560,7 +561,7 @@ def create_full_signal_annotation(label: str) -> dict: annotations.append(phase_annotation) # sort annotations by sample_start to satisfy sigmf ordering requirement - annotations.sort(key=lambda a: a[SigMFFile.START_INDEX_KEY]) + annotations.sort(key=lambda a: a[keys.SAMPLE_START_KEY]) return annotations @@ -595,27 +596,27 @@ def _build_metadata(self, samples: np.ndarray) -> dict: # create metadata structure global_info = { - SigMFFile.DATATYPE_KEY: get_data_type_str(samples), - SigMFFile.SAMPLE_RATE_KEY: self._sample_rate_hz, - SigMFFile.NUM_CHANNELS_KEY: 1, - SigMFFile.RECORDER_KEY: generator_info, - SigMFFile.DESCRIPTION_KEY: self._description, + keys.DATATYPE_KEY: get_data_type_str(samples), + keys.SAMPLE_RATE_KEY: self._sample_rate_hz, + keys.NUM_CHANNELS_KEY: 1, + keys.RECORDER_KEY: generator_info, + keys.DESCRIPTION_KEY: self._description, } if self._author is not None: - global_info[SigMFFile.AUTHOR_KEY] = self._author + global_info[keys.AUTHOR_KEY] = self._author # create capture info capture_info = { - SigMFFile.START_INDEX_KEY: 0, - SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), + keys.SAMPLE_START_KEY: 0, + keys.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), } # add frequency if there's a single dominant tone component tone_components = [c for c in self._signal_components if c["type"] == "tone"] if len(tone_components) == 1 and len(self._signal_components) == 1: dominant_freq = tone_components[0]["frequency_hz"] + self._frequency_offset_hz - capture_info[SigMFFile.FREQUENCY_KEY] = dominant_freq + capture_info[keys.FREQUENCY_KEY] = dominant_freq # create annotations for signal components annotations = self._build_annotations(samples) diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 4077422..4bcea4d 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -15,13 +15,8 @@ import numpy as np -from . import __specification__, __version__, hashing, schema, validate +from . import __specification__, __version__, hashing, keys, schema, validate from .archive import ( - SIGMF_ARCHIVE_EXT, - SIGMF_COLLECTION_EXT, - SIGMF_COMPRESSED_EXTS, - SIGMF_DATASET_EXT, - SIGMF_METADATA_EXT, SigMFArchive, _detect_compression, _get_archive_basename, @@ -33,10 +28,64 @@ SigMFFileError, SigMFFileExistsError, ) +from .keys import ( + SIGMF_ARCHIVE_EXT, + SIGMF_COLLECTION_EXT, + SIGMF_COMPRESSED_EXTS, + SIGMF_DATASET_EXT, + SIGMF_METADATA_EXT, +) from .utils import dict_merge, get_data_type_str -class SigMFMetafile: +class _DeprecatingKey: + """Descriptor that emits DeprecationWarning when a field key constant is + accessed from the class or an instance, directing users to the sigmf module level. + Each attribute name only warns once per interpreter session.""" + + _warned: set = set() + + def __init__(self, value): + self._value = value + self._attr_name = None + + def __set_name__(self, owner, name): + self._attr_name = name + + def __get__(self, obj, objtype=None): + if self._attr_name not in _DeprecatingKey._warned: + _DeprecatingKey._warned.add(self._attr_name) + cls_name = (objtype or type(obj)).__name__ + warnings.warn( + f"{cls_name}.{self._attr_name} is deprecated and will be removed in a future version, " + f"use sigmf.{self._attr_name}", + DeprecationWarning, + stacklevel=2, + ) + return self._value + + +class _SigMFDeprecatingMeta(type): + """Metaclass that emits DeprecationWarning for renamed key constants.""" + + _warned: set = set() + + def __getattr__(cls, name): + if name in keys._DEPRECATED_ALIASES: + new_name, value = keys._DEPRECATED_ALIASES[name] + if name not in _SigMFDeprecatingMeta._warned: + _SigMFDeprecatingMeta._warned.add(name) + warnings.warn( + f"{cls.__name__}.{name} is deprecated and will be removed in a future version, " + f"use sigmf.{new_name} or {cls.__name__}.{new_name}", + DeprecationWarning, + stacklevel=2, + ) + return value + raise AttributeError(f"type object '{cls.__name__}' has no attribute '{name}'") + + +class SigMFMetafile(metaclass=_SigMFDeprecatingMeta): VALID_KEYS = {} def __init__(self): @@ -119,76 +168,48 @@ def dumps(self, pretty=True): class SigMFFile(SigMFMetafile): - START_INDEX_KEY = "core:sample_start" - LENGTH_INDEX_KEY = "core:sample_count" - GLOBAL_INDEX_KEY = "core:global_index" - START_OFFSET_KEY = "core:offset" - NUM_CHANNELS_KEY = "core:num_channels" - HASH_KEY = "core:sha512" - VERSION_KEY = "core:version" - DATATYPE_KEY = "core:datatype" - FREQUENCY_KEY = "core:frequency" - HEADER_BYTES_KEY = "core:header_bytes" - FLO_KEY = "core:freq_lower_edge" - FHI_KEY = "core:freq_upper_edge" - SAMPLE_RATE_KEY = "core:sample_rate" - COMMENT_KEY = "core:comment" - DESCRIPTION_KEY = "core:description" - AUTHOR_KEY = "core:author" - META_DOI_KEY = "core:meta_doi" - DATA_DOI_KEY = "core:data_doi" - GENERATOR_KEY = "core:generator" - LABEL_KEY = "core:label" - RECORDER_KEY = "core:recorder" - LICENSE_KEY = "core:license" - HW_KEY = "core:hw" - DATASET_KEY = "core:dataset" - TRAILING_BYTES_KEY = "core:trailing_bytes" - METADATA_ONLY_KEY = "core:metadata_only" - EXTENSIONS_KEY = "core:extensions" - DATETIME_KEY = "core:datetime" - LAT_KEY = "core:latitude" - LON_KEY = "core:longitude" - UUID_KEY = "core:uuid" - GEOLOCATION_KEY = "core:geolocation" - COLLECTION_KEY = "core:collection" + # field key constants — access via sigmf.XYZ_KEY; class-level access is deprecated + SAMPLE_START_KEY = _DeprecatingKey(keys.SAMPLE_START_KEY) + SAMPLE_COUNT_KEY = _DeprecatingKey(keys.SAMPLE_COUNT_KEY) + GLOBAL_INDEX_KEY = _DeprecatingKey(keys.GLOBAL_INDEX_KEY) + OFFSET_KEY = _DeprecatingKey(keys.OFFSET_KEY) + NUM_CHANNELS_KEY = _DeprecatingKey(keys.NUM_CHANNELS_KEY) + SHA512_KEY = _DeprecatingKey(keys.SHA512_KEY) + VERSION_KEY = _DeprecatingKey(keys.VERSION_KEY) + DATATYPE_KEY = _DeprecatingKey(keys.DATATYPE_KEY) + FREQUENCY_KEY = _DeprecatingKey(keys.FREQUENCY_KEY) + HEADER_BYTES_KEY = _DeprecatingKey(keys.HEADER_BYTES_KEY) + FREQ_LOWER_EDGE_KEY = _DeprecatingKey(keys.FREQ_LOWER_EDGE_KEY) + FREQ_UPPER_EDGE_KEY = _DeprecatingKey(keys.FREQ_UPPER_EDGE_KEY) + SAMPLE_RATE_KEY = _DeprecatingKey(keys.SAMPLE_RATE_KEY) + COMMENT_KEY = _DeprecatingKey(keys.COMMENT_KEY) + DESCRIPTION_KEY = _DeprecatingKey(keys.DESCRIPTION_KEY) + AUTHOR_KEY = _DeprecatingKey(keys.AUTHOR_KEY) + META_DOI_KEY = _DeprecatingKey(keys.META_DOI_KEY) + DATA_DOI_KEY = _DeprecatingKey(keys.DATA_DOI_KEY) + GENERATOR_KEY = _DeprecatingKey(keys.GENERATOR_KEY) + LABEL_KEY = _DeprecatingKey(keys.LABEL_KEY) + RECORDER_KEY = _DeprecatingKey(keys.RECORDER_KEY) + LICENSE_KEY = _DeprecatingKey(keys.LICENSE_KEY) + HW_KEY = _DeprecatingKey(keys.HW_KEY) + DATASET_KEY = _DeprecatingKey(keys.DATASET_KEY) + TRAILING_BYTES_KEY = _DeprecatingKey(keys.TRAILING_BYTES_KEY) + METADATA_ONLY_KEY = _DeprecatingKey(keys.METADATA_ONLY_KEY) + EXTENSIONS_KEY = _DeprecatingKey(keys.EXTENSIONS_KEY) + DATETIME_KEY = _DeprecatingKey(keys.DATETIME_KEY) + LATITUDE_KEY = _DeprecatingKey(keys.LATITUDE_KEY) + LONGITUDE_KEY = _DeprecatingKey(keys.LONGITUDE_KEY) + UUID_KEY = _DeprecatingKey(keys.UUID_KEY) + GEOLOCATION_KEY = _DeprecatingKey(keys.GEOLOCATION_KEY) + COLLECTION_KEY = _DeprecatingKey(keys.COLLECTION_KEY) + # section structure keys — kept class-level, not promoted to module level GLOBAL_KEY = "global" CAPTURE_KEY = "captures" ANNOTATION_KEY = "annotations" - VALID_GLOBAL_KEYS = [ - AUTHOR_KEY, - COLLECTION_KEY, - DATASET_KEY, - DATATYPE_KEY, - DATA_DOI_KEY, - DESCRIPTION_KEY, - EXTENSIONS_KEY, - GEOLOCATION_KEY, - HASH_KEY, - HW_KEY, - LICENSE_KEY, - META_DOI_KEY, - METADATA_ONLY_KEY, - NUM_CHANNELS_KEY, - RECORDER_KEY, - SAMPLE_RATE_KEY, - START_OFFSET_KEY, - TRAILING_BYTES_KEY, - VERSION_KEY, - ] - VALID_CAPTURE_KEYS = [DATETIME_KEY, FREQUENCY_KEY, HEADER_BYTES_KEY, GLOBAL_INDEX_KEY, START_INDEX_KEY] - VALID_ANNOTATION_KEYS = [ - COMMENT_KEY, - FHI_KEY, - FLO_KEY, - GENERATOR_KEY, - LABEL_KEY, - LAT_KEY, - LENGTH_INDEX_KEY, - LON_KEY, - START_INDEX_KEY, - UUID_KEY, - ] + # valid key lists + VALID_GLOBAL_KEYS = keys.VALID_GLOBAL_KEYS + VALID_CAPTURE_KEYS = keys.VALID_CAPTURE_KEYS + VALID_ANNOTATION_KEYS = keys.VALID_ANNOTATION_KEYS VALID_KEYS = {GLOBAL_KEY: VALID_GLOBAL_KEYS, CAPTURE_KEY: VALID_CAPTURE_KEYS, ANNOTATION_KEY: VALID_ANNOTATION_KEYS} def __init__( @@ -333,7 +354,7 @@ def __getitem__(self, sli): # apply autoscaling for fixed-point data when autoscale=True if self.autoscale: - dtype = dtype_info(self.get_global_field(self.DATATYPE_KEY)) + dtype = dtype_info(self.get_global_field(keys.DATATYPE_KEY)) if dtype["is_fixedpoint"]: # extract scaling parameters is_unsigned_data = dtype["is_unsigned"] @@ -400,11 +421,11 @@ def _is_conforming_dataset(self): ------- `True` if the dataset is conforming to SigMF, `False` otherwise """ - if self.get_global_field(self.TRAILING_BYTES_KEY, 0): + if self.get_global_field(keys.TRAILING_BYTES_KEY, 0): return False for capture in self.get_captures(): # check for any non-zero `header_bytes` fields in captures segments - if capture.get(self.HEADER_BYTES_KEY, 0): + if capture.get(keys.HEADER_BYTES_KEY, 0): return False if self.data_file is not None and not self.data_file.is_file: return False @@ -429,9 +450,9 @@ def _get_ncd_offset(self): # check if this is an NCD with core:dataset and header_bytes captures = self.get_captures() - dataset_field = self.get_global_field(self.DATASET_KEY) - if dataset_field and captures and self.HEADER_BYTES_KEY in captures[0]: - return captures[0][self.HEADER_BYTES_KEY] + dataset_field = self.get_global_field(keys.DATASET_KEY) + if dataset_field and captures and keys.HEADER_BYTES_KEY in captures[0]: + return captures[0][keys.HEADER_BYTES_KEY] return 0 @@ -439,7 +460,7 @@ def get_schema(self): """ Return a schema object valid for the current metadata """ - current_metadata_version = self.get_global_info().get(self.VERSION_KEY) + current_metadata_version = self.get_global_info().get(keys.VERSION_KEY) if self.version != current_metadata_version or self.schema is None: self.version = current_metadata_version self.schema = schema.get_schema(self.version) @@ -462,13 +483,13 @@ def set_metadata(self, metadata): raise SigMFError("Unable to interpret provided metadata.") # ensure fields required for parsing are present or use defaults - if self.get_global_field(self.NUM_CHANNELS_KEY) is None: - self.set_global_field(self.NUM_CHANNELS_KEY, 1) - if self.get_global_field(self.START_OFFSET_KEY) is None: - self.set_global_field(self.START_OFFSET_KEY, 0) + if self.get_global_field(keys.NUM_CHANNELS_KEY) is None: + self.set_global_field(keys.NUM_CHANNELS_KEY, 1) + if self.get_global_field(keys.OFFSET_KEY) is None: + self.set_global_field(keys.OFFSET_KEY, 0) # set version to current implementation - self.set_global_field(self.VERSION_KEY, __specification__) + self.set_global_field(keys.VERSION_KEY, __specification__) def set_global_info(self, new_global): """ @@ -507,11 +528,11 @@ def add_capture(self, start_index, metadata=None): raise SigMFAccessError("Capture start_index cannot be less than dataset start offset.") capture_list = self._metadata[self.CAPTURE_KEY] new_capture = metadata or {} - new_capture[self.START_INDEX_KEY] = start_index + new_capture[keys.SAMPLE_START_KEY] = start_index # merge if capture exists merged = False for idx, existing_capture in enumerate(self._metadata[self.CAPTURE_KEY]): - if existing_capture[self.START_INDEX_KEY] == start_index: + if existing_capture[keys.SAMPLE_START_KEY] == start_index: self._metadata[self.CAPTURE_KEY][idx] = dict_merge(existing_capture, new_capture) merged = True if not merged: @@ -519,7 +540,7 @@ def add_capture(self, start_index, metadata=None): # sort captures by start_index self._metadata[self.CAPTURE_KEY] = sorted( capture_list, - key=lambda item: item[self.START_INDEX_KEY], + key=lambda item: item[keys.SAMPLE_START_KEY], ) def get_captures(self): @@ -539,7 +560,7 @@ def get_capture_info(self, index): raise SigMFAccessError("No captures in metadata.") cap_info = captures[0] for capture in captures: - if capture[self.START_INDEX_KEY] > index: + if capture[keys.SAMPLE_START_KEY] > index: break cap_info = capture return cap_info @@ -549,9 +570,9 @@ def get_capture_start(self, index): Returns a the start sample index of a given capture, will raise SigMFAccessError if this field is missing. """ - start = self.get_captures()[index].get(self.START_INDEX_KEY) + start = self.get_captures()[index].get(keys.SAMPLE_START_KEY) if start is None: - raise SigMFAccessError("Capture {} does not have required {} key".format(index, self.START_INDEX_KEY)) + raise SigMFAccessError("Capture {} does not have required {} key".format(index, keys.SAMPLE_START_KEY)) return start def get_capture_byte_boundaries(self, index): @@ -568,7 +589,7 @@ def get_capture_byte_boundaries(self, index): start_byte = 0 prev_start_sample = 0 for ii, capture in enumerate(self.get_captures()): - start_byte += capture.get(self.HEADER_BYTES_KEY, 0) + start_byte += capture.get(keys.HEADER_BYTES_KEY, 0) start_byte += (self.get_capture_start(ii) - prev_start_sample) * self.get_sample_size() * self.num_channels prev_start_sample = self.get_capture_start(ii) if ii >= index: @@ -584,7 +605,7 @@ def get_capture_byte_boundaries(self, index): file_size = len(self.data_buffer.getbuffer()) else: raise SigMFFileError("Neither data_file nor data_buffer is available") - end_byte = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) + end_byte = file_size - self.get_global_field(keys.TRAILING_BYTES_KEY, 0) else: end_byte += ( (self.get_capture_start(index + 1) - self.get_capture_start(index)) @@ -610,17 +631,17 @@ def add_annotation(self, start_index, length=None, metadata=None): raise SigMFAccessError("Annotation start_index cannot be less than dataset start offset.") new_annot = metadata or {} - new_annot[self.START_INDEX_KEY] = start_index + new_annot[keys.SAMPLE_START_KEY] = start_index if length is not None: if length <= 0: raise SigMFAccessError("Annotation `length` must be >= 0") - new_annot[self.LENGTH_INDEX_KEY] = length + new_annot[keys.SAMPLE_COUNT_KEY] = length self._metadata[self.ANNOTATION_KEY] += [new_annot] # sort annotations by start_index self._metadata[self.ANNOTATION_KEY] = sorted( self._metadata[self.ANNOTATION_KEY], - key=lambda item: item[self.START_INDEX_KEY], + key=lambda item: item[keys.SAMPLE_START_KEY], ) def get_annotations(self, index=None): @@ -644,12 +665,12 @@ def get_annotations(self, index=None): annotations_including_index = [] for annotation in annotations: - if index < annotation[self.START_INDEX_KEY]: + if index < annotation[keys.SAMPLE_START_KEY]: # index is before annotation starts -> skip continue - if self.LENGTH_INDEX_KEY in annotation: + if keys.SAMPLE_COUNT_KEY in annotation: # Annotation includes sample_count -> check end index - if index >= annotation[self.START_INDEX_KEY] + annotation[self.LENGTH_INDEX_KEY]: + if index >= annotation[keys.SAMPLE_START_KEY] + annotation[keys.SAMPLE_COUNT_KEY]: # index is after annotation end -> skip continue @@ -680,14 +701,14 @@ def _count_samples(self): sample_bytes = self.data_size_bytes else: # calculate from file size, subtracting header and trailing bytes - header_bytes = sum([c.get(self.HEADER_BYTES_KEY, 0) for c in self.get_captures()]) + header_bytes = sum([c.get(keys.HEADER_BYTES_KEY, 0) for c in self.get_captures()]) if self.data_file is not None: file_bytes = self.data_file.stat().st_size elif self.data_buffer is not None: file_bytes = len(self.data_buffer.getbuffer()) else: file_bytes = 0 - sample_bytes = file_bytes - self.get_global_field(self.TRAILING_BYTES_KEY, 0) - header_bytes + sample_bytes = file_bytes - self.get_global_field(keys.TRAILING_BYTES_KEY, 0) - header_bytes total_sample_size = self.get_sample_size() * self.num_channels sample_count, remainder = divmod(sample_bytes, total_sample_size) @@ -708,12 +729,12 @@ def _get_sample_count_from_annotations(self): """ annon_sample_count = [] for annon in self.get_annotations(): - if self.LENGTH_INDEX_KEY in annon: + if keys.SAMPLE_COUNT_KEY in annon: # Annotation with sample_count - annon_sample_count.append(annon[self.START_INDEX_KEY] + annon[self.LENGTH_INDEX_KEY]) + annon_sample_count.append(annon[keys.SAMPLE_START_KEY] + annon[keys.SAMPLE_COUNT_KEY]) else: # Annotation without sample_count - sample count must be at least sample_start - annon_sample_count.append(annon[self.START_INDEX_KEY]) + annon_sample_count.append(annon[keys.SAMPLE_START_KEY]) if annon_sample_count: return max(annon_sample_count) @@ -725,7 +746,7 @@ def calculate_hash(self): Calculates the hash of the data file and adds it to the global section. Also returns a string representation of the hash. """ - old_hash = self.get_global_field(self.HASH_KEY) + old_hash = self.get_global_field(keys.SHA512_KEY) if self.data_file is not None: new_hash = hashing.calculate_sha512(filename=self.data_file) else: @@ -734,7 +755,7 @@ def calculate_hash(self): if old_hash != new_hash: raise SigMFFileError("Calculated file hash does not match associated metadata.") - self.set_global_field(self.HASH_KEY, new_hash) + self.set_global_field(keys.SHA512_KEY, new_hash) return new_hash def set_data_file( @@ -744,7 +765,7 @@ def set_data_file( Set the datafile path, then recalculate sample count. Update the hash and return the hash string if enabled. """ - if self.get_global_field(self.DATATYPE_KEY) is None: + if self.get_global_field(keys.DATATYPE_KEY) is None: raise SigMFFileError("Error setting data file, the DATATYPE_KEY must be set in the global metadata first.") self.data_file = Path(data_file) if data_file else None @@ -753,7 +774,7 @@ def set_data_file( self.data_size_bytes = size_bytes self._count_samples() - dtype = dtype_info(self.get_global_field(self.DATATYPE_KEY)) + dtype = dtype_info(self.get_global_field(keys.DATATYPE_KEY)) self.is_complex_data = dtype["is_complex"] num_channels = self.num_channels self.ndim = 1 if (num_channels < 2) else 2 @@ -789,7 +810,7 @@ def set_data_file( file_name = self.data_file.name ext = self.data_file.suffix if ext.lower() != SIGMF_DATASET_EXT: - self.set_global_field(SigMFFile.DATASET_KEY, file_name) + self.set_global_field(keys.DATASET_KEY, file_name) if skip_checksum: return None @@ -951,7 +972,7 @@ def read_samples(self, start_index=0, count=-1): elif start_index + count > self.sample_count: raise IOError("Cannot read beyond EOF.") if self.data_file is None and not isinstance(self.data_buffer, io.BytesIO): - if self.get_global_field(self.METADATA_ONLY_KEY, False): + if self.get_global_field(keys.METADATA_ONLY_KEY, False): # only if data_file is `None` allows access to dynamically generated datsets raise SigMFFileError("Cannot read samples from a metadata only distribution.") else: @@ -963,7 +984,7 @@ def _read_datafile(self, first_byte, nitems): """ internal function for reading samples from datafile """ - dtype = dtype_info(self.get_global_field(self.DATATYPE_KEY)) + dtype = dtype_info(self.get_global_field(keys.DATATYPE_KEY)) self.is_complex_data = dtype["is_complex"] is_fixedpoint_data = dtype["is_fixedpoint"] is_unsigned_data = dtype["is_unsigned"] @@ -1008,23 +1029,18 @@ def _read_datafile(self, first_byte, nitems): class SigMFCollection(SigMFMetafile): - VERSION_KEY = "core:version" - DESCRIPTION_KEY = "core:description" - AUTHOR_KEY = "core:author" - COLLECTION_DOI_KEY = "core:collection_doi" - LICENSE_KEY = "core:license" - EXTENSIONS_KEY = "core:extensions" - STREAMS_KEY = "core:streams" + # field key constants — access via sigmf.XYZ_KEY; class-level access is deprecated + VERSION_KEY = _DeprecatingKey(keys.VERSION_KEY) + DESCRIPTION_KEY = _DeprecatingKey(keys.DESCRIPTION_KEY) + AUTHOR_KEY = _DeprecatingKey(keys.AUTHOR_KEY) + COLLECTION_DOI_KEY = _DeprecatingKey(keys.COLLECTION_DOI_KEY) + LICENSE_KEY = _DeprecatingKey(keys.LICENSE_KEY) + EXTENSIONS_KEY = _DeprecatingKey(keys.EXTENSIONS_KEY) + STREAMS_KEY = _DeprecatingKey(keys.STREAMS_KEY) + # section structure key — kept class-level (value differs from SigMFFile.COLLECTION_KEY) COLLECTION_KEY = "collection" - VALID_COLLECTION_KEYS = [ - AUTHOR_KEY, - COLLECTION_DOI_KEY, - DESCRIPTION_KEY, - EXTENSIONS_KEY, - LICENSE_KEY, - STREAMS_KEY, - VERSION_KEY, - ] + # valid key lists + VALID_COLLECTION_KEYS = keys.VALID_COLLECTION_KEYS VALID_KEYS = {COLLECTION_KEY: VALID_COLLECTION_KEYS} def __init__( @@ -1061,7 +1077,7 @@ def __init__( if metadata is None: self._metadata = {self.COLLECTION_KEY: {}} - self._metadata[self.COLLECTION_KEY][self.STREAMS_KEY] = [] + self._metadata[self.COLLECTION_KEY][keys.STREAMS_KEY] = [] else: self._metadata = metadata @@ -1071,7 +1087,7 @@ def __init__( self.set_streams(metafiles) # set version to current implementation - self.set_collection_field(self.VERSION_KEY, __specification__) + self.set_collection_field(keys.VERSION_KEY, __specification__) if not self.skip_checksums: self.verify_stream_hashes() @@ -1091,7 +1107,7 @@ def verify_stream_hashes(self) -> None: SigMFFileError If any dataset checksums do not match saved metadata. """ - streams = self.get_collection_field(self.STREAMS_KEY, []) + streams = self.get_collection_field(keys.STREAMS_KEY, []) for stream in streams: old_hash = stream.get("hash") metafile_name = get_sigmf_filenames(stream.get("name"))["meta_fn"] @@ -1120,13 +1136,13 @@ def set_streams(self, metafiles) -> None: streams.append(stream) else: raise SigMFFileError(f"Specifed stream file {metafile_path} is not a valid SigMF Metadata file") - self.set_collection_field(self.STREAMS_KEY, streams) + self.set_collection_field(keys.STREAMS_KEY, streams) def get_stream_names(self) -> list: """ Returns a list of `name` object(s) from the `collection` level `core:streams` metadata. """ - return [s.get("name") for s in self.get_collection_field(self.STREAMS_KEY, [])] + return [s.get("name") for s in self.get_collection_field(keys.STREAMS_KEY, [])] def set_collection_info(self, new_collection: dict) -> None: """ @@ -1279,26 +1295,26 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): 3. Return ``None`` (may be a metadata-only distribution). """ compliant_filename = get_sigmf_filenames(meta_fn)["data_fn"] - noncompliant_filename = metadata["global"].get(SigMFFile.DATASET_KEY, None) + noncompliant_filename = metadata["global"].get(keys.DATASET_KEY, None) if noncompliant_filename: if Path.is_file(compliant_filename): warnings.warn( - f"{SigMFFile.DATASET_KEY} is defined but compliant dataset `{compliant_filename}` exists; " - f"using `{noncompliant_filename}` specified by {SigMFFile.DATASET_KEY}" + f"{keys.DATASET_KEY} is defined but compliant dataset `{compliant_filename}` exists; " + f"using `{noncompliant_filename}` specified by {keys.DATASET_KEY}" ) dir_path = Path(meta_fn).parent noncompliant_data_file_path = Path.joinpath(dir_path, noncompliant_filename) if Path.is_file(noncompliant_data_file_path): - if metadata["global"].get(SigMFFile.METADATA_ONLY_KEY, False): + if metadata["global"].get(keys.METADATA_ONLY_KEY, False): raise SigMFFileError( - f"Schema defines {SigMFFile.DATASET_KEY} " - f"but {SigMFFile.METADATA_ONLY_KEY} also exists; using `{noncompliant_filename}`" + f"Schema defines {keys.DATASET_KEY} " + f"but {keys.METADATA_ONLY_KEY} also exists; using `{noncompliant_filename}`" ) return noncompliant_data_file_path else: raise SigMFFileError( - f"Non-Compliant Dataset `{noncompliant_filename}` is specified in {SigMFFile.DATASET_KEY} " + f"Non-Compliant Dataset `{noncompliant_filename}` is specified in {keys.DATASET_KEY} " "but does not exist!" ) elif Path.is_file(compliant_filename): @@ -1352,15 +1368,15 @@ def fromarray(data, sample_rate, frequency=None, global_info=None): # build metadata info = { - SigMFFile.DATATYPE_KEY: get_data_type_str(data), - SigMFFile.SAMPLE_RATE_KEY: sample_rate, + keys.DATATYPE_KEY: get_data_type_str(data), + keys.SAMPLE_RATE_KEY: sample_rate, } if global_info is not None: info.update(global_info) capture_meta = None if frequency is not None: - capture_meta = {SigMFFile.FREQUENCY_KEY: frequency} + capture_meta = {keys.FREQUENCY_KEY: frequency} # create sigmffile object with in-memory buffer meta = SigMFFile(global_info=info) @@ -1521,13 +1537,7 @@ def get_sigmf_filenames(filename): # If the path has a sigmf suffix, remove it. Otherwise do not remove the # suffix, because the filename might contain '.' characters which are part # of the filename rather than an extension. - sigmf_suffixes = [ - SIGMF_DATASET_EXT, - SIGMF_METADATA_EXT, - SIGMF_ARCHIVE_EXT, - SIGMF_COLLECTION_EXT, - ] - if stem_path.suffix in sigmf_suffixes: + if stem_path.suffix in keys.SIGMF_SUFFIXES: with_suffix_path = stem_path stem_path = stem_path.with_suffix("") else: diff --git a/sigmf/validate.py b/sigmf/validate.py index b849455..cc35865 100644 --- a/sigmf/validate.py +++ b/sigmf/validate.py @@ -24,12 +24,12 @@ import jsonschema from . import __version__ as toolversion -from . import error, schema, sigmffile +from . import error, keys, schema, sigmffile def _get_namespaces_declared(metadata: dict) -> set: """Get set of declared extension namespaces.""" - extensions = metadata.get("global", {}).get(sigmffile.SigMFFile.EXTENSIONS_KEY, []) + extensions = metadata.get("global", {}).get(sigmffile.keys.EXTENSIONS_KEY, []) return {ext["name"].split(":")[0] for ext in extensions} @@ -81,7 +81,7 @@ def validate(metadata, ref_schema=schema.get_schema()) -> None: if undeclared: warnings.warn( f"Found undeclared extensions in use: {', '.join(sorted(undeclared))}. " - f"All extensions should be declared in {sigmffile.SigMFFile.EXTENSIONS_KEY}. " + f"All extensions should be declared in {sigmffile.keys.EXTENSIONS_KEY}. " "This will raise a ValidationError in future versions.", DeprecationWarning, stacklevel=2, @@ -91,7 +91,7 @@ def validate(metadata, ref_schema=schema.get_schema()) -> None: for key in ["captures", "annotations"]: count = -1 for item in metadata[key]: - new_count = item[sigmffile.SigMFFile.START_INDEX_KEY] + new_count = item[sigmffile.keys.SAMPLE_START_KEY] if new_count < count: raise jsonschema.exceptions.ValidationError(f"{key} has incorrect sample start ordering.") count = new_count diff --git a/tests/conftest.py b/tests/conftest.py index a5379ef..d9599ad 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,8 +10,7 @@ import pytest -from sigmf import __specification__ -from sigmf.archive import SIGMF_DATASET_EXT +from sigmf import DATATYPE_KEY, VERSION_KEY, __specification__ from sigmf.sigmffile import SigMFFile from .testdata import TEST_FLOAT32_DATA, TEST_METADATA @@ -20,7 +19,7 @@ @pytest.fixture def test_data_file(): """when called, yields temporary dataset""" - with tempfile.NamedTemporaryFile(suffix=f".{SIGMF_DATASET_EXT}") as temp: + with tempfile.NamedTemporaryFile(suffix=".sigmf-data") as temp: TEST_FLOAT32_DATA.tofile(temp.name) yield temp @@ -29,8 +28,8 @@ def test_data_file(): def test_sigmffile(test_data_file): """If pytest uses this signature, will return valid SigMF file.""" meta = SigMFFile() - meta.set_global_field("core:datatype", "rf32_le") - meta.set_global_field("core:version", __specification__) + meta.set_global_field(DATATYPE_KEY, "rf32_le") + meta.set_global_field(VERSION_KEY, __specification__) meta.add_annotation(start_index=0, length=len(TEST_FLOAT32_DATA)) meta.add_capture(start_index=0) meta.set_data_file(test_data_file.name) diff --git a/tests/test_archive.py b/tests/test_archive.py index 547a5c9..026edc8 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -18,8 +18,7 @@ import jsonschema import numpy as np -from sigmf import SigMFFile, __specification__, error, fromfile -from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT +from sigmf import DATATYPE_KEY, SigMFFile, __specification__, error, fromfile from sigmf.archivereader import SigMFArchiveReader from .testdata import TEST_FLOAT32_DATA, TEST_METADATA @@ -113,7 +112,7 @@ def test_archive_files_have_correct_names_and_extensions(self): basedir, file1, file2 = self.sigmf_tarfile.getmembers() archive_name = basedir.name self.assertEqual(archive_name, Path(self.temp_path_archive).stem) - file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT} + file_extensions = {".sigmf-data", ".sigmf-meta"} file1_name, file1_ext = Path(file1.name).stem, Path(file1.name).suffix self.assertEqual(file1_name, archive_name) @@ -135,7 +134,7 @@ def test_archive_files_have_correct_permissions(self): def test_archive_contents_match_original_data(self): """Test archive contents""" _, file1, file2 = self.sigmf_tarfile.getmembers() - if file1.name.endswith(SIGMF_METADATA_EXT): + if file1.name.endswith(".sigmf-meta"): mdfile = file1 datfile = file2 else: @@ -201,12 +200,12 @@ def test_roundtrip_all_formats(self): path = self.temp_dir / f"test.{ext}" self.sigmf_object.archive(name=path, overwrite=True) self.assertTrue(path.exists()) - readback = fromfile(str(path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(path) + np.testing.assert_array_equal(self.original_samples, loopback[:]) # verify metadata preserved self.assertEqual( - self.sigmf_object.get_global_field(SigMFFile.DATATYPE_KEY), - readback.get_global_field(SigMFFile.DATATYPE_KEY), + self.sigmf_object.get_global_field(DATATYPE_KEY), + loopback.get_global_field(DATATYPE_KEY), ) def test_compressed_smaller_than_uncompressed(self): @@ -230,8 +229,8 @@ def test_explicit_compression_param(self): self.sigmf_object.archive(name=path, compression="gz", overwrite=True) expected = self.temp_dir / "foo.sigmf.gz" self.assertTrue(expected.exists()) - readback = fromfile(str(expected)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(expected) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_invalid_compression_raises_error(self): """invalid compression type raises error""" @@ -246,7 +245,7 @@ def test_mismatched_extension_and_compression_raises_error(self): with self.assertRaises(error.SigMFFileError): self.sigmf_object.archive(name=path, compression="xz", overwrite=True) with self.assertRaises(error.SigMFFileError): - self.sigmf_object.tofile(str(path), compression="xz", overwrite=True) + self.sigmf_object.tofile(path, compression="xz", overwrite=True) def test_uncompressed_archive_uses_memmap(self): """uncompressed archives use memmap for data access""" @@ -258,39 +257,39 @@ def test_uncompressed_archive_uses_memmap(self): def test_tofile_sigmf_ext(self): """tofile() with .sigmf extension creates archive""" path = self.temp_dir / "foo.sigmf" - self.sigmf_object.tofile(str(path), overwrite=True) + self.sigmf_object.tofile(path, overwrite=True) self.assertTrue(path.exists()) self.assertFalse((self.temp_dir / "foo.sigmf-meta").exists()) - readback = fromfile(str(path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(path) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_tofile_compressed_ext(self): """tofile() with compressed extensions creates compressed archives""" for ext, name in [("gz", "bar"), ("xz", "baz"), ("zip", "qux")]: path = self.temp_dir / f"{name}.sigmf.{ext}" - self.sigmf_object.tofile(str(path), overwrite=True) + self.sigmf_object.tofile(path, overwrite=True) self.assertTrue(path.exists()) self.assertFalse((self.temp_dir / f"{name}.sigmf.{ext}.sigmf-meta").exists()) - readback = fromfile(str(path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(path) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_tofile_explicit_compression(self): """tofile() with explicit compression parameter adds correct extension""" path = self.temp_dir / "foo" - self.sigmf_object.tofile(str(path), compression="xz", overwrite=True) + self.sigmf_object.tofile(path, compression="xz", overwrite=True) expected = self.temp_dir / "foo.sigmf.xz" self.assertTrue(expected.exists()) self.assertFalse((self.temp_dir / "foo.sigmf").exists()) - readback = fromfile(str(expected)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(expected) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_archive_sigmf_ext(self): """archive() with .sigmf extension creates archive""" path = self.temp_dir / "bar.sigmf" self.sigmf_object.archive(name=path, overwrite=True) self.assertTrue(path.exists()) - readback = fromfile(str(path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(path) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_archive_compressed_ext(self): """archive() with compressed extensions creates compressed archives""" @@ -298,8 +297,8 @@ def test_archive_compressed_ext(self): path = self.temp_dir / f"{name}.sigmf.{ext}" self.sigmf_object.archive(name=path, overwrite=True) self.assertTrue(path.exists()) - readback = fromfile(str(path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(path) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_archive_explicit_compression(self): """archive() with explicit compression parameter adds correct extension""" @@ -307,8 +306,8 @@ def test_archive_explicit_compression(self): self.sigmf_object.archive(name=path, compression="xz", overwrite=True) expected = self.temp_dir / "qux.sigmf.xz" self.assertTrue(expected.exists()) - readback = fromfile(str(expected)) - np.testing.assert_array_equal(self.original_samples, readback[:]) + loopback = fromfile(expected) + np.testing.assert_array_equal(self.original_samples, loopback[:]) def test_data_buffer_writes_data_file(self): """tofile() with data_buffer writes both metadata and data files""" @@ -324,7 +323,7 @@ def test_data_buffer_writes_data_file(self): # tofile without archive extension should create separate files path = self.temp_dir / "generated" - meta.tofile(str(path), overwrite=True) + meta.tofile(path, overwrite=True) # should create both .sigmf-meta and .sigmf-data expected_meta = self.temp_dir / "generated.sigmf-meta" @@ -333,5 +332,5 @@ def test_data_buffer_writes_data_file(self): self.assertTrue(expected_data.exists()) # verify data roundtrips correctly - readback = fromfile(str(path)) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) + loopback = fromfile(path) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, loopback[:]) diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index fc945ec..b39c5bb 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -11,8 +11,7 @@ import numpy as np -import sigmf -from sigmf import SigMFArchiveReader, SigMFFile, __specification__ +from sigmf import DATATYPE_KEY, NUM_CHANNELS_KEY, SigMFArchiveReader, SigMFFile, __specification__, fromfile class TestArchiveReader(unittest.TestCase): @@ -48,25 +47,25 @@ def test_access_data_without_untar(self): temp_meta = SigMFFile( data_file=temp_data.name, global_info={ - SigMFFile.DATATYPE_KEY: f"{complex_prefix}{key}_le", - SigMFFile.NUM_CHANNELS_KEY: num_channels, + DATATYPE_KEY: f"{complex_prefix}{key}_le", + NUM_CHANNELS_KEY: num_channels, }, ) temp_meta.tofile(temp_archive.name, overwrite=True) - readback = SigMFArchiveReader(temp_archive.name) - readback_samples = readback[:] + loopback = SigMFArchiveReader(temp_archive.name) + loopback_samples = loopback[:] if complex_prefix == "c": # complex data will be half as long target_count //= 2 - self.assertTrue(np.iscomplexobj(readback_samples)) + self.assertTrue(np.iscomplexobj(loopback_samples)) if num_channels != 1: # check expected # of channels self.assertEqual( - readback_samples.ndim, + loopback_samples.ndim, 2, - "Mismatch in shape of readback samples.", + "Mismatch in shape of loopback samples.", ) target_count //= num_channels @@ -77,8 +76,8 @@ def test_access_data_without_untar(self): ) self.assertEqual( target_count, - len(readback), - "Mismatch in expected readback length", + len(loopback), + "Mismatch in expected loopback length", ) @@ -86,7 +85,7 @@ def test_archiveread_data_file_unchanged(test_sigmffile): with NamedTemporaryFile(suffix=".sigmf") as temp_file: input_samples = test_sigmffile.read_samples() test_sigmffile.archive(temp_file.name, overwrite=True) - arc = sigmf.fromfile(temp_file.name) + arc = fromfile(temp_file.name) output_samples = arc.read_samples() assert np.array_equal(input_samples, output_samples) diff --git a/tests/test_attributes.py b/tests/test_attributes.py index f6a1f61..67f84a6 100644 --- a/tests/test_attributes.py +++ b/tests/test_attributes.py @@ -5,6 +5,7 @@ import numpy as np +import sigmf from sigmf import SigMFFile from sigmf.error import SigMFAccessError @@ -25,17 +26,14 @@ def setUp(self): def test_getter_existing_fields(self): """test attribute getters for existing core fields""" # test common core fields - # self.assertEqual(self.meta.sample_rate, self.meta.get_global_field(SigMFFile.SAMPLE_RATE_KEY)) - # self.assertEqual(self.meta.author, self.meta.get_global_field(SigMFFile.AUTHOR_KEY)) - self.assertEqual(self.meta.datatype, self.meta.get_global_field(SigMFFile.DATATYPE_KEY)) - self.assertEqual(self.meta.sha512, self.meta.get_global_field(SigMFFile.HASH_KEY)) - # self.assertEqual(self.meta.description, self.meta.get_global_field(SigMFFile.DESCRIPTION_KEY)) + self.assertEqual(self.meta.datatype, self.meta.get_global_field(sigmf.DATATYPE_KEY)) + self.assertEqual(self.meta.sha512, self.meta.get_global_field(sigmf.SHA512_KEY)) def test_getter_missing_core_fields(self): """test that getter raises SigMFAccessError for missing core fields""" with self.assertRaises(SigMFAccessError) as context: _ = self.meta.license - self.assertIn(SigMFFile.LICENSE_KEY, str(context.exception)) + self.assertIn(sigmf.LICENSE_KEY, str(context.exception)) def test_getter_nonexistent_attribute(self): """test that getter raises AttributeError for non-core attributes""" @@ -74,12 +72,12 @@ def test_setter_noncore_attributes(self): def test_method_vs_attribute_equivalence(self): """test that method-based and attribute-based access are equivalent""" # set via method, access via attribute - self.meta.set_global_field(SigMFFile.LICENSE_KEY, SOME_LICENSE) + self.meta.set_global_field(sigmf.LICENSE_KEY, SOME_LICENSE) self.assertEqual(self.meta.license, SOME_LICENSE) # set via attribute, access via method self.meta.recorder = SOME_RECORDER - self.assertEqual(self.meta.get_global_field(SigMFFile.RECORDER_KEY), SOME_RECORDER) + self.assertEqual(self.meta.get_global_field(sigmf.RECORDER_KEY), SOME_RECORDER) def test_private_attributes_unaffected(self): """test that private attributes work normally""" diff --git a/tests/test_collection.py b/tests/test_collection.py index 72b4e5a..0a4ee7e 100644 --- a/tests/test_collection.py +++ b/tests/test_collection.py @@ -17,7 +17,6 @@ from hypothesis import given from hypothesis import strategies as st -from sigmf.archive import SIGMF_COLLECTION_EXT, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT from sigmf.sigmffile import SigMFCollection, SigMFFile, fromfile from .testdata import TEST_FLOAT32_DATA, TEST_METADATA @@ -37,11 +36,11 @@ def tearDown(self): @given(st.sampled_from([".", "subdir/", "sub0/sub1/sub2/"])) def test_load_collection(self, subdir: str) -> None: """test path handling for collections""" - data_name1 = "dat1" + SIGMF_DATASET_EXT - data_name2 = "dat2" + SIGMF_DATASET_EXT - meta_name1 = "dat1" + SIGMF_METADATA_EXT - meta_name2 = "dat2" + SIGMF_METADATA_EXT - collection_name = "collection" + SIGMF_COLLECTION_EXT + data_name1 = "dat1.sigmf-data" + data_name2 = "dat2.sigmf-data" + meta_name1 = "dat1.sigmf-meta" + meta_name2 = "dat2.sigmf-meta" + collection_name = "collection.sigmf-collection" data_path1 = self.temp_dir / subdir / data_name1 data_path2 = self.temp_dir / subdir / data_name2 meta_path1 = self.temp_dir / subdir / meta_name1 diff --git a/tests/test_hashing.py b/tests/test_hashing.py index 0c5ca54..835c1e3 100644 --- a/tests/test_hashing.py +++ b/tests/test_hashing.py @@ -16,7 +16,8 @@ import numpy as np -from sigmf import SigMFFile, hashing +import sigmf +from sigmf import SigMFFile, TRAILING_BYTES_KEY, hashing from .testdata import TEST_FLOAT32_DATA, TEST_METADATA @@ -43,13 +44,13 @@ def test_ncd_hash_covers_entire_file(self): # Create SigMF metadata for NCD ncd_metadata = deepcopy(TEST_METADATA) - del ncd_metadata["global"][SigMFFile.HASH_KEY] - ncd_metadata["global"][SigMFFile.TRAILING_BYTES_KEY] = 32 + del ncd_metadata["global"][sigmf.SHA512_KEY] + ncd_metadata["global"][TRAILING_BYTES_KEY] = 32 meta = SigMFFile(metadata=ncd_metadata) meta.set_data_file(data_path, offset=64) file_hash = hashing.calculate_sha512(filename=data_path) - sigmf_hash = meta.get_global_field(SigMFFile.HASH_KEY) + sigmf_hash = meta.get_global_field(sigmf.SHA512_KEY) self.assertEqual(file_hash, sigmf_hash) def test_edge_cases(self): diff --git a/tests/test_ncd.py b/tests/test_ncd.py index b6c6959..6f5fc5b 100644 --- a/tests/test_ncd.py +++ b/tests/test_ncd.py @@ -17,6 +17,7 @@ from hypothesis import given from hypothesis import strategies as st +import sigmf from sigmf.error import SigMFFileError from sigmf.sigmffile import SigMFFile, fromfile @@ -80,11 +81,11 @@ def test_ncd_priority_over_conforming_dataset(self) -> None: # create metadata that references the ncd file ncd_metadata = copy.deepcopy(TEST_METADATA) - ncd_metadata[SigMFFile.GLOBAL_KEY][SigMFFile.DATASET_KEY] = f"{base_name}.fleeb" - ncd_metadata[SigMFFile.GLOBAL_KEY][SigMFFile.NUM_CHANNELS_KEY] = 1 - ncd_metadata[SigMFFile.GLOBAL_KEY][SigMFFile.DATATYPE_KEY] = "rf32_le" - ncd_metadata[SigMFFile.GLOBAL_KEY].pop(SigMFFile.HASH_KEY, None) - ncd_metadata[SigMFFile.ANNOTATION_KEY] = [{SigMFFile.LENGTH_INDEX_KEY: 4, SigMFFile.START_INDEX_KEY: 0}] + ncd_metadata[SigMFFile.GLOBAL_KEY][sigmf.DATASET_KEY] = f"{base_name}.fleeb" + ncd_metadata[SigMFFile.GLOBAL_KEY][sigmf.NUM_CHANNELS_KEY] = 1 + ncd_metadata[SigMFFile.GLOBAL_KEY][sigmf.DATATYPE_KEY] = "rf32_le" + ncd_metadata[SigMFFile.GLOBAL_KEY].pop(sigmf.SHA512_KEY, None) + ncd_metadata[SigMFFile.ANNOTATION_KEY] = [{sigmf.SAMPLE_COUNT_KEY: 4, sigmf.SAMPLE_START_KEY: 0}] # write metadata file meta = SigMFFile(metadata=ncd_metadata) diff --git a/tests/test_siggen.py b/tests/test_siggen.py index 489dcd1..29b74e5 100644 --- a/tests/test_siggen.py +++ b/tests/test_siggen.py @@ -11,6 +11,7 @@ import numpy as np import numpy.testing as npt +import sigmf from sigmf import SigMFFile from sigmf.error import SigMFGeneratorError from sigmf.siggen import SigMFGenerator @@ -66,7 +67,7 @@ def test_reproducible(self): # set capture datetime identical for sig in [signal0, signal1, signal2]: - sig.add_capture(0, {SigMFFile.DATETIME_KEY: "2026-01-01T00:00:00Z"}) + sig.add_capture(0, {sigmf.DATETIME_KEY: "2026-01-01T00:00:00Z"}) # compare metadata (which includes checksums) self.assertEqual(signal0, signal1) @@ -119,7 +120,7 @@ def test_nominal_chaining(self): ) # verify chaining worked - self.assertEqual(signal.get_global_info()[SigMFFile.AUTHOR_KEY], "test@example.com") + self.assertEqual(signal.get_global_info()[sigmf.AUTHOR_KEY], "test@example.com") self.assertEqual(signal.description, "test signal") # should have multiple annotations: main signal + noise + freq offset + phase offset @@ -127,15 +128,15 @@ def test_nominal_chaining(self): self.assertGreaterEqual(len(annotations), 3) # at least main + noise + offsets # find main signal annotation (has comment) - main_annotation = next(ann for ann in annotations if SigMFFile.COMMENT_KEY in ann) - self.assertEqual(main_annotation[SigMFFile.COMMENT_KEY], "test comment") + main_annotation = next(ann for ann in annotations if sigmf.COMMENT_KEY in ann) + self.assertEqual(main_annotation[sigmf.COMMENT_KEY], "test comment") # verify there's a noise annotation - noise_annotations = [ann for ann in annotations if "AWGN" in ann.get(SigMFFile.LABEL_KEY, "")] + noise_annotations = [ann for ann in annotations if "AWGN" in ann.get(sigmf.LABEL_KEY, "")] self.assertEqual(len(noise_annotations), 1) # verify there's a frequency offset annotation - freq_offset_annotations = [ann for ann in annotations if "freq offset" in ann.get(SigMFFile.LABEL_KEY, "")] + freq_offset_annotations = [ann for ann in annotations if "freq offset" in ann.get(sigmf.LABEL_KEY, "")] self.assertEqual(len(freq_offset_annotations), 1) def test_snr_noise_addition(self): @@ -160,7 +161,7 @@ def test_frequency_offset(self): # verify frequency in capture metadata includes offset captures = signal.get_captures() - self.assertEqual(captures[0][SigMFFile.FREQUENCY_KEY], base_freq + offset_freq) + self.assertEqual(captures[0][sigmf.FREQUENCY_KEY], base_freq + offset_freq) def test_metadata_completeness(self): """test that generated metadata is complete and valid""" @@ -169,12 +170,12 @@ def test_metadata_completeness(self): # verify required global fields global_info = signal.get_global_info() required_keys = [ - SigMFFile.DATATYPE_KEY, - SigMFFile.SAMPLE_RATE_KEY, - SigMFFile.VERSION_KEY, - SigMFFile.NUM_CHANNELS_KEY, - SigMFFile.RECORDER_KEY, - SigMFFile.DESCRIPTION_KEY, + sigmf.DATATYPE_KEY, + sigmf.SAMPLE_RATE_KEY, + sigmf.VERSION_KEY, + sigmf.NUM_CHANNELS_KEY, + sigmf.RECORDER_KEY, + sigmf.DESCRIPTION_KEY, ] for key in required_keys: @@ -183,8 +184,8 @@ def test_metadata_completeness(self): # verify captures exist captures = signal.get_captures() self.assertEqual(len(captures), 1) - self.assertIn(SigMFFile.START_INDEX_KEY, captures[0]) - self.assertIn(SigMFFile.DATETIME_KEY, captures[0]) + self.assertIn(sigmf.SAMPLE_START_KEY, captures[0]) + self.assertIn(sigmf.DATETIME_KEY, captures[0]) # should be valid sigmf signal.validate() @@ -192,11 +193,11 @@ def test_metadata_completeness(self): def test_recorder_info(self): """test that recorder metadata includes seed when provided and excludes it when not""" with_seed = SigMFGenerator(seed=self.seed).generate() - recorder_info = with_seed.get_global_info()[SigMFFile.RECORDER_KEY] + recorder_info = with_seed.get_global_info()[sigmf.RECORDER_KEY] self.assertIn(f"seed={self.seed:#x}", recorder_info) without_seed = SigMFGenerator().generate() - recorder_info = without_seed.get_global_info()[SigMFFile.RECORDER_KEY] + recorder_info = without_seed.get_global_info()[sigmf.RECORDER_KEY] self.assertNotIn("seed=", recorder_info) def test_data_buffer_creation(self): @@ -246,28 +247,28 @@ def test_automatic_annotations(self): self.assertEqual(len(annotations), 3) # find and verify main tone annotation - tone_annotation = next(ann for ann in annotations if "tone at 1000 Hz" in ann.get(SigMFFile.LABEL_KEY, "")) + tone_annotation = next(ann for ann in annotations if "tone at 1000 Hz" in ann.get(sigmf.LABEL_KEY, "")) # with temporal windowing, start index can be any valid sample index - self.assertGreaterEqual(tone_annotation[SigMFFile.START_INDEX_KEY], 0) - self.assertLess(tone_annotation[SigMFFile.START_INDEX_KEY], 48000 * 0.1) # less than total samples - self.assertEqual(tone_annotation[SigMFFile.GENERATOR_KEY], "SigMFGenerator") - self.assertIn(SigMFFile.FLO_KEY, tone_annotation) - self.assertIn(SigMFFile.FHI_KEY, tone_annotation) - self.assertEqual(tone_annotation[SigMFFile.COMMENT_KEY], "test") + self.assertGreaterEqual(tone_annotation[sigmf.SAMPLE_START_KEY], 0) + self.assertLess(tone_annotation[sigmf.SAMPLE_START_KEY], 48000 * 0.1) # less than total samples + self.assertEqual(tone_annotation[sigmf.GENERATOR_KEY], "SigMFGenerator") + self.assertIn(sigmf.FREQ_LOWER_EDGE_KEY, tone_annotation) + self.assertIn(sigmf.FREQ_UPPER_EDGE_KEY, tone_annotation) + self.assertEqual(tone_annotation[sigmf.COMMENT_KEY], "test") # verify tone frequency edges account for offset (1000 + 200 = 1200 Hz center) - center_freq = (tone_annotation[SigMFFile.FLO_KEY] + tone_annotation[SigMFFile.FHI_KEY]) / 2 + center_freq = (tone_annotation[sigmf.FREQ_LOWER_EDGE_KEY] + tone_annotation[sigmf.FREQ_UPPER_EDGE_KEY]) / 2 self.assertAlmostEqual(center_freq, 1200.0, places=1) # find and verify noise annotation - noise_annotation = next(ann for ann in annotations if "AWGN" in ann.get(SigMFFile.LABEL_KEY, "")) - self.assertIn("15.0 dB SNR", noise_annotation[SigMFFile.LABEL_KEY]) - self.assertEqual(noise_annotation[SigMFFile.FLO_KEY], 0.0) - self.assertEqual(noise_annotation[SigMFFile.FHI_KEY], 24000.0) # nyquist + noise_annotation = next(ann for ann in annotations if "AWGN" in ann.get(sigmf.LABEL_KEY, "")) + self.assertIn("15.0 dB SNR", noise_annotation[sigmf.LABEL_KEY]) + self.assertEqual(noise_annotation[sigmf.FREQ_LOWER_EDGE_KEY], 0.0) + self.assertEqual(noise_annotation[sigmf.FREQ_UPPER_EDGE_KEY], 24000.0) # nyquist # find and verify frequency offset annotation - offset_annotation = next(ann for ann in annotations if "freq offset" in ann.get(SigMFFile.LABEL_KEY, "")) - self.assertIn("+200.0 Hz", offset_annotation[SigMFFile.LABEL_KEY]) + offset_annotation = next(ann for ann in annotations if "freq offset" in ann.get(sigmf.LABEL_KEY, "")) + self.assertIn("+200.0 Hz", offset_annotation[sigmf.LABEL_KEY]) def test_sweep_annotations(self): """test sweep annotations have correct frequency bounds including negative""" @@ -277,9 +278,9 @@ def test_sweep_annotations(self): self.assertEqual(len(annotations), 1) # just main sweep annotation sweep_annotation = annotations[0] - self.assertEqual(sweep_annotation[SigMFFile.FLO_KEY], -2500.0) - self.assertEqual(sweep_annotation[SigMFFile.FHI_KEY], 2500.0) - self.assertIn("sweep from -2500 to 2500 Hz", sweep_annotation[SigMFFile.LABEL_KEY]) + self.assertEqual(sweep_annotation[sigmf.FREQ_LOWER_EDGE_KEY], -2500.0) + self.assertEqual(sweep_annotation[sigmf.FREQ_UPPER_EDGE_KEY], 2500.0) + self.assertIn("sweep from -2500 to 2500 Hz", sweep_annotation[sigmf.LABEL_KEY]) def test_reverse_sweep_annotations(self): """test reverse sweep crossing DC has correct bounds""" @@ -289,10 +290,10 @@ def test_reverse_sweep_annotations(self): sweep_annotation = annotations[0] # frequency bounds should be min/max regardless of sweep direction - self.assertEqual(sweep_annotation[SigMFFile.FLO_KEY], -800.0) - self.assertEqual(sweep_annotation[SigMFFile.FHI_KEY], 3000.0) + self.assertEqual(sweep_annotation[sigmf.FREQ_LOWER_EDGE_KEY], -800.0) + self.assertEqual(sweep_annotation[sigmf.FREQ_UPPER_EDGE_KEY], 3000.0) # but label should show original order - self.assertIn("sweep from 3000 to -800 Hz", sweep_annotation[SigMFFile.LABEL_KEY]) + self.assertIn("sweep from 3000 to -800 Hz", sweep_annotation[sigmf.LABEL_KEY]) def test_minimal_annotations(self): """test that simple signals get minimal but complete annotations""" @@ -303,10 +304,10 @@ def test_minimal_annotations(self): annotation = annotations[0] # with temporal windowing, start index can be any valid sample index - self.assertGreaterEqual(annotation[SigMFFile.START_INDEX_KEY], 0) - self.assertIn(SigMFFile.LENGTH_INDEX_KEY, annotation) - self.assertIn(SigMFFile.GENERATOR_KEY, annotation) - self.assertIn("tone at 440 Hz", annotation[SigMFFile.LABEL_KEY]) + self.assertGreaterEqual(annotation[sigmf.SAMPLE_START_KEY], 0) + self.assertIn(sigmf.SAMPLE_COUNT_KEY, annotation) + self.assertIn(sigmf.GENERATOR_KEY, annotation) + self.assertIn("tone at 440 Hz", annotation[sigmf.LABEL_KEY]) def test_phase_offset(self): """test phase offset functionality""" @@ -317,8 +318,8 @@ def test_phase_offset(self): signal_1 = SigMFGenerator(seed=42).tone().phase_offset(phase_offset).generate() # tone annotations are last after sorting (full-signal annotations start at 0) - start_idx_0 = signal_0.get_annotations()[-1][SigMFFile.START_INDEX_KEY] - start_idx_1 = signal_1.get_annotations()[-1][SigMFFile.START_INDEX_KEY] + start_idx_0 = signal_0.get_annotations()[-1][sigmf.SAMPLE_START_KEY] + start_idx_1 = signal_1.get_annotations()[-1][sigmf.SAMPLE_START_KEY] # both should start at the same sample index (same seed) self.assertEqual(start_idx_0, start_idx_1) diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index bbfd0cd..245f1e0 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -11,12 +11,14 @@ import shutil import tempfile import unittest +import warnings from pathlib import Path import numpy as np import sigmf from sigmf import SigMFFile, error, utils +from sigmf.sigmffile import _DeprecatingKey, _SigMFDeprecatingMeta from .testdata import * @@ -68,7 +70,7 @@ def test_checksum(self): """Ensure checksum fails when incorrect or empty string.""" for new_checksum in ("", "a", 0): bad_checksum_metadata = copy.deepcopy(TEST_METADATA) - bad_checksum_metadata[SigMFFile.GLOBAL_KEY][SigMFFile.HASH_KEY] = new_checksum + bad_checksum_metadata[SigMFFile.GLOBAL_KEY][sigmf.SHA512_KEY] = new_checksum with self.assertRaises(error.SigMFFileError): _ = SigMFFile(bad_checksum_metadata, self.temp_path_data) @@ -91,8 +93,8 @@ def test_get_annotations_with_index(self): self.assertListEqual( annotations_idx10, [ - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.LENGTH_INDEX_KEY: 16}, - {SigMFFile.START_INDEX_KEY: 1}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.SAMPLE_COUNT_KEY: 16}, + {sigmf.SAMPLE_START_KEY: 1}, ], ) @@ -171,8 +173,8 @@ def test_multichannel_types(self): temp_signal = SigMFFile( data_file=self.temp_path, global_info={ - SigMFFile.DATATYPE_KEY: f"{complex_prefix}{key}_le", - SigMFFile.NUM_CHANNELS_KEY: num_channels, + sigmf.DATATYPE_KEY: f"{complex_prefix}{key}_le", + sigmf.NUM_CHANNELS_KEY: num_channels, }, ) temp_samples = temp_signal.read_samples() @@ -194,8 +196,8 @@ def test_multichannel_seek(self): temp_signal = SigMFFile( data_file=self.temp_path, global_info={ - SigMFFile.DATATYPE_KEY: "cu16_le", - SigMFFile.NUM_CHANNELS_KEY: 3, + sigmf.DATATYPE_KEY: "cu16_le", + sigmf.NUM_CHANNELS_KEY: 3, }, autoscale=False, ) @@ -462,7 +464,7 @@ def test_metadata_overwrite_works(self): # create sigmf object with different data and metadata alt_sigmf = SigMFFile() - alt_sigmf.set_global_field(SigMFFile.DATATYPE_KEY, "rf32_le") + alt_sigmf.set_global_field(sigmf.DATATYPE_KEY, "rf32_le") alt_sigmf.set_global_field("core:description", "overwritten file") alt_sigmf.set_data_file(self.alt_data_path) @@ -492,7 +494,7 @@ def test_archive_overwrite_works(self): # create sigmf object with different data alt_sigmf = SigMFFile() - alt_sigmf.set_global_field(SigMFFile.DATATYPE_KEY, "rf32_le") + alt_sigmf.set_global_field(sigmf.DATATYPE_KEY, "rf32_le") alt_sigmf.set_global_field("core:description", "overwritten archive") alt_sigmf.set_data_file(self.alt_data_path) @@ -501,10 +503,10 @@ def test_archive_overwrite_works(self): self.assertTrue(self.test_archive_path.exists()) # verify by reading the archive content back - readback_sigmf = sigmf.fromarchive(self.test_archive_path) - new_checksum = readback_sigmf.get_global_field("core:sha512") + loopback_sigmf = sigmf.fromarchive(self.test_archive_path) + new_checksum = loopback_sigmf.get_global_field("core:sha512") - self.assertEqual(readback_sigmf.get_global_field("core:description"), "overwritten archive") + self.assertEqual(loopback_sigmf.get_global_field("core:description"), "overwritten archive") self.assertNotEqual(original_checksum, new_checksum, "SHA512 checksum should change when overwritten") def test_default_behavior(self): @@ -533,8 +535,8 @@ def tearDown(self): def test_basic_creation(self): """test creating SigMFFile from array""" meta = sigmf.fromarray(TEST_FLOAT32_DATA, sample_rate=4000) - self.assertEqual(meta.get_global_field(SigMFFile.SAMPLE_RATE_KEY), 4000) - self.assertEqual(meta.get_global_field(SigMFFile.DATATYPE_KEY), "rf32_le") + self.assertEqual(meta.get_global_field(sigmf.SAMPLE_RATE_KEY), 4000) + self.assertEqual(meta.get_global_field(sigmf.DATATYPE_KEY), "rf32_le") np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) def test_with_frequency(self): @@ -549,8 +551,8 @@ def test_write_separate_files(self): meta.tofile(str(path)) self.assertTrue((self.temp_dir / "basic.sigmf-data").exists()) self.assertTrue((self.temp_dir / "basic.sigmf-meta").exists()) - readback = sigmf.fromfile(str(path)) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) + loopback = sigmf.fromfile(str(path)) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, loopback[:]) def test_write_archive(self): """test writing to uncompressed archive""" @@ -560,8 +562,8 @@ def test_write_archive(self): self.assertTrue((self.temp_dir / "archived.sigmf").exists()) self.assertFalse((self.temp_dir / "archived.sigmf-data").exists()) self.assertFalse((self.temp_dir / "archived.sigmf-meta").exists()) - readback = sigmf.fromfile(str(path)) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) + loopback = sigmf.fromfile(str(path)) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, loopback[:]) def test_write_compressed_archive(self): """test writing to compressed archive""" @@ -571,8 +573,8 @@ def test_write_compressed_archive(self): self.assertTrue((self.temp_dir / "comp.sigmf.xz").exists()) self.assertFalse((self.temp_dir / "comp.sigmf-data").exists()) self.assertFalse((self.temp_dir / "comp.sigmf-meta").exists()) - readback = sigmf.fromfile(str(path)) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) + loopback = sigmf.fromfile(str(path)) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, loopback[:]) def test_with_global_info(self): """test that global_info dict is merged into metadata""" diff --git a/tests/test_validation.py b/tests/test_validation.py index fe6d278..9029f50 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -84,7 +84,7 @@ def setUp(self): def test_no_version(self): """version key must be present""" meta = SigMFFile(copy.deepcopy(self.metadata)) - del meta._metadata[SigMFFile.GLOBAL_KEY][SigMFFile.VERSION_KEY] + del meta._metadata[SigMFFile.GLOBAL_KEY][sigmf.VERSION_KEY] with self.assertRaises(ValidationError): meta.validate() @@ -96,13 +96,13 @@ def test_extra_top_level_key(self): def test_invalid_type(self): """license key must be string""" - self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.LICENSE_KEY] = 1 + self.metadata[SigMFFile.GLOBAL_KEY][sigmf.LICENSE_KEY] = 1 with self.assertRaises(ValidationError): SigMFFile(self.metadata).validate() def test_invalid_capture_order(self): """metadata must have captures in order""" - self.metadata[SigMFFile.CAPTURE_KEY] = [{SigMFFile.START_INDEX_KEY: 10}, {SigMFFile.START_INDEX_KEY: 9}] + self.metadata[SigMFFile.CAPTURE_KEY] = [{sigmf.SAMPLE_START_KEY: 10}, {sigmf.SAMPLE_START_KEY: 9}] with self.assertRaises(ValidationError): SigMFFile(self.metadata).validate() @@ -110,12 +110,12 @@ def test_invalid_annotation_order(self): """metadata must have annotations in order""" self.metadata[SigMFFile.ANNOTATION_KEY] = [ { - SigMFFile.START_INDEX_KEY: 2, - SigMFFile.LENGTH_INDEX_KEY: 120000, + sigmf.SAMPLE_START_KEY: 2, + sigmf.SAMPLE_COUNT_KEY: 120000, }, { - SigMFFile.START_INDEX_KEY: 1, - SigMFFile.LENGTH_INDEX_KEY: 120000, + sigmf.SAMPLE_START_KEY: 1, + sigmf.SAMPLE_COUNT_KEY: 120000, }, ] with self.assertRaises(ValidationError): @@ -123,14 +123,14 @@ def test_invalid_annotation_order(self): def test_annotation_without_sample_count(self): """annotation without length should be accepted""" - self.metadata[SigMFFile.ANNOTATION_KEY] = [{SigMFFile.START_INDEX_KEY: 2}] + self.metadata[SigMFFile.ANNOTATION_KEY] = [{sigmf.SAMPLE_START_KEY: 2}] SigMFFile(self.metadata).validate() def test_invalid_hash(self): """wrong hash raises error on creation""" with tempfile.NamedTemporaryFile() as temp_file: TEST_FLOAT32_DATA.tofile(temp_file.name) - self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.HASH_KEY] = "derp" + self.metadata[SigMFFile.GLOBAL_KEY][sigmf.SHA512_KEY] = "derp" with self.assertRaises(sigmf.error.SigMFFileError): SigMFFile(metadata=self.metadata, data_file=temp_file.name) @@ -151,7 +151,7 @@ def test_declared_namespace(self): """known namespace should not raise a warning""" self.metadata[SigMFFile.GLOBAL_KEY]["other_namespace:key"] = 0 # define other_namespace - self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.EXTENSIONS_KEY] = [ + self.metadata[SigMFFile.GLOBAL_KEY][sigmf.EXTENSIONS_KEY] = [ { "name": "other_namespace", "version": "0.0.1", diff --git a/tests/testdata.py b/tests/testdata.py index 2e93b99..f34e4a3 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -12,6 +12,7 @@ import numpy as np +import sigmf from sigmf import SigMFFile, __specification__, __version__ @@ -48,14 +49,14 @@ def validate_ncd(test: unittest.TestCase, meta: SigMFFile, target_path: Path): TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) TEST_METADATA = { - SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], - SigMFFile.CAPTURE_KEY: [{SigMFFile.START_INDEX_KEY: 0}], + SigMFFile.ANNOTATION_KEY: [{sigmf.SAMPLE_COUNT_KEY: 16, sigmf.SAMPLE_START_KEY: 0}], + SigMFFile.CAPTURE_KEY: [{sigmf.SAMPLE_START_KEY: 0}], SigMFFile.GLOBAL_KEY: { - SigMFFile.DATATYPE_KEY: "rf32_le", - SigMFFile.HASH_KEY: "f4984219b318894fa7144519185d1ae81ea721c6113243a52b51e444512a39d74cf41a4cec3c5d000bd7277cc71232c04d7a946717497e18619bdbe94bfeadd6", - SigMFFile.NUM_CHANNELS_KEY: 1, - SigMFFile.START_OFFSET_KEY: 0, - SigMFFile.VERSION_KEY: __specification__, + sigmf.DATATYPE_KEY: "rf32_le", + sigmf.SHA512_KEY: "f4984219b318894fa7144519185d1ae81ea721c6113243a52b51e444512a39d74cf41a4cec3c5d000bd7277cc71232c04d7a946717497e18619bdbe94bfeadd6", + sigmf.NUM_CHANNELS_KEY: 1, + sigmf.OFFSET_KEY: 0, + sigmf.VERSION_KEY: __specification__, }, } @@ -64,53 +65,53 @@ def validate_ncd(test: unittest.TestCase, meta: SigMFFile, target_path: Path): TEST_U8_META0 = { SigMFFile.ANNOTATION_KEY: [], SigMFFile.CAPTURE_KEY: [ - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.HEADER_BYTES_KEY: 0}, - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.HEADER_BYTES_KEY: 0}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.HEADER_BYTES_KEY: 0}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.HEADER_BYTES_KEY: 0}, ], # very strange..but technically legal? - SigMFFile.GLOBAL_KEY: {SigMFFile.DATATYPE_KEY: "ru8", SigMFFile.TRAILING_BYTES_KEY: 0}, + SigMFFile.GLOBAL_KEY: {sigmf.DATATYPE_KEY: "ru8", sigmf.TRAILING_BYTES_KEY: 0}, } # Data1 is a test of a two capture recording with header_bytes and trailing_bytes set TEST_U8_DATA1 = [0xFE] * 32 + list(range(192)) + [0xFF] * 32 TEST_U8_META1 = { SigMFFile.ANNOTATION_KEY: [], SigMFFile.CAPTURE_KEY: [ - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.HEADER_BYTES_KEY: 32}, - {SigMFFile.START_INDEX_KEY: 128}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.HEADER_BYTES_KEY: 32}, + {sigmf.SAMPLE_START_KEY: 128}, ], - SigMFFile.GLOBAL_KEY: {SigMFFile.DATATYPE_KEY: "ru8", SigMFFile.TRAILING_BYTES_KEY: 32}, + SigMFFile.GLOBAL_KEY: {sigmf.DATATYPE_KEY: "ru8", sigmf.TRAILING_BYTES_KEY: 32}, } # Data2 is a test of a two capture recording with multiple header_bytes set TEST_U8_DATA2 = [0xFE] * 32 + list(range(128)) + [0xFE] * 16 + list(range(128, 192)) + [0xFF] * 16 TEST_U8_META2 = { SigMFFile.ANNOTATION_KEY: [], SigMFFile.CAPTURE_KEY: [ - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.HEADER_BYTES_KEY: 32}, - {SigMFFile.START_INDEX_KEY: 128, SigMFFile.HEADER_BYTES_KEY: 16}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.HEADER_BYTES_KEY: 32}, + {sigmf.SAMPLE_START_KEY: 128, sigmf.HEADER_BYTES_KEY: 16}, ], - SigMFFile.GLOBAL_KEY: {SigMFFile.DATATYPE_KEY: "ru8", SigMFFile.TRAILING_BYTES_KEY: 16}, + SigMFFile.GLOBAL_KEY: {sigmf.DATATYPE_KEY: "ru8", sigmf.TRAILING_BYTES_KEY: 16}, } # Data3 is a test of a three capture recording with multiple header_bytes set TEST_U8_DATA3 = [0xFE] * 32 + list(range(128)) + [0xFE] * 32 + list(range(128, 192)) TEST_U8_META3 = { SigMFFile.ANNOTATION_KEY: [], SigMFFile.CAPTURE_KEY: [ - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.HEADER_BYTES_KEY: 32}, - {SigMFFile.START_INDEX_KEY: 32}, - {SigMFFile.START_INDEX_KEY: 128, SigMFFile.HEADER_BYTES_KEY: 32}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.HEADER_BYTES_KEY: 32}, + {sigmf.SAMPLE_START_KEY: 32}, + {sigmf.SAMPLE_START_KEY: 128, sigmf.HEADER_BYTES_KEY: 32}, ], - SigMFFile.GLOBAL_KEY: {SigMFFile.DATATYPE_KEY: "ru8"}, + SigMFFile.GLOBAL_KEY: {sigmf.DATATYPE_KEY: "ru8"}, } # Data4 is a two channel version of Data0 TEST_U8_DATA4 = [0xFE] * 32 + [y for y in list(range(96)) for i in [0, 1]] + [0xFF] * 32 TEST_U8_META4 = { SigMFFile.ANNOTATION_KEY: [], SigMFFile.CAPTURE_KEY: [ - {SigMFFile.START_INDEX_KEY: 0, SigMFFile.HEADER_BYTES_KEY: 32}, - {SigMFFile.START_INDEX_KEY: 64}, + {sigmf.SAMPLE_START_KEY: 0, sigmf.HEADER_BYTES_KEY: 32}, + {sigmf.SAMPLE_START_KEY: 64}, ], SigMFFile.GLOBAL_KEY: { - SigMFFile.DATATYPE_KEY: "ru8", - SigMFFile.TRAILING_BYTES_KEY: 32, - SigMFFile.NUM_CHANNELS_KEY: 2, + sigmf.DATATYPE_KEY: "ru8", + sigmf.TRAILING_BYTES_KEY: 32, + sigmf.NUM_CHANNELS_KEY: 2, }, }