diff --git a/imod/mf6/aggregate/aggregate_schemes.py b/imod/mf6/aggregate/aggregate_schemes.py index 67b03ae65..b83264813 100644 --- a/imod/mf6/aggregate/aggregate_schemes.py +++ b/imod/mf6/aggregate/aggregate_schemes.py @@ -1,4 +1,4 @@ -from typing import Callable +from typing import Any, Callable import numpy as np from pydantic.dataclasses import dataclass @@ -37,10 +37,10 @@ class RiverAggregationMethod(DataclassType): """ - stage: Callable = np.nanmean - conductance: Callable = np.nansum - bottom_elevation: Callable = np.nanmean - concentration: Callable = np.nanmean + stage: Callable[..., Any] = np.nanmean + conductance: Callable[..., Any] = np.nansum + bottom_elevation: Callable[..., Any] = np.nanmean + concentration: Callable[..., Any] = np.nanmean @dataclass(config=_CONFIG) @@ -65,9 +65,9 @@ class DrainageAggregationMethod(DataclassType): """ - elevation: Callable = np.nanmean - conductance: Callable = np.nansum - concentration: Callable = np.nanmean + elevation: Callable[..., Any] = np.nanmean + conductance: Callable[..., Any] = np.nansum + concentration: Callable[..., Any] = np.nanmean @dataclass(config=_CONFIG) @@ -92,9 +92,9 @@ class GeneralHeadBoundaryAggregationMethod(DataclassType): """ - head: Callable = np.nanmean - conductance: Callable = np.nansum - concentration: Callable = np.nanmean + head: Callable[..., Any] = np.nanmean + conductance: Callable[..., Any] = np.nansum + concentration: Callable[..., Any] = np.nanmean @dataclass(config=_CONFIG) @@ -118,5 +118,5 @@ class RechargeAggregationMethod(DataclassType): """ - rate: Callable = np.nansum - concentration: Callable = np.nanmean + rate: Callable[..., Any] = np.nansum + concentration: Callable[..., Any] = np.nanmean diff --git a/imod/mf6/ats.py b/imod/mf6/ats.py index 44c20c205..f108a6957 100644 --- a/imod/mf6/ats.py +++ b/imod/mf6/ats.py @@ -8,7 +8,7 @@ from imod.schemata import AllValueSchema, DimsSchema, DTypeSchema from imod.typing import GridDataset -_PeriodDataType: TypeAlias = dict[np.int64, list] +_PeriodDataType: TypeAlias = dict[np.int64, list[Any]] _PeriodDataVarNames: TypeAlias = tuple[str, str, str, str, str] @@ -208,7 +208,7 @@ def _get_render_dictionary( d["perioddata"] = perioddata return d - def _validate(self, schemata: dict, **kwargs): + def _validate(self, schemata: dict[str, Any], **kwargs): # Insert additional kwargs kwargs["dt_max"] = self["dt_max"] errors = super()._validate(schemata, **kwargs) diff --git a/imod/mf6/boundary_condition.py b/imod/mf6/boundary_condition.py index 7cf5f5387..d8861c8a6 100644 --- a/imod/mf6/boundary_condition.py +++ b/imod/mf6/boundary_condition.py @@ -1,7 +1,7 @@ import abc import pathlib from copy import copy, deepcopy -from typing import Mapping, Optional, Union +from typing import Any, MutableMapping, Optional, Union import numpy as np import xarray as xr @@ -69,13 +69,15 @@ class BoundaryCondition(Package, abc.ABC): not the array input which is used in :class:`Package`. """ - def __init__(self, allargs: Mapping[str, GridDataArray | float | int | bool | str]): + def __init__( + self, allargs: MutableMapping[str, GridDataArray | float | int | bool | str] + ): # Convert repeat_stress in dict to a xr.DataArray in the right shape if # necessary, which is required to merge it into the dataset. if "repeat_stress" in allargs.keys() and isinstance( allargs["repeat_stress"], dict ): - allargs["repeat_stress"] = get_repeat_stress(allargs["repeat_stress"]) # type: ignore + allargs["repeat_stress"] = get_repeat_stress(allargs["repeat_stress"]) # Call the Package constructor, this merges the arguments into a dataset. super().__init__(allargs) if "concentration" in allargs.keys() and allargs["concentration"] is None: @@ -197,7 +199,9 @@ def _period_paths( return periods def _get_unfiltered_pkg_options( - self, predefined_options: dict, not_options: Optional[list] = None + self, + predefined_options: dict[str, Any], + not_options: Optional[list[str]] = None, ): options = copy(predefined_options) @@ -208,11 +212,13 @@ def _get_unfiltered_pkg_options( if varname in not_options: continue v = self.dataset[varname].values[()] - options[varname] = v + options[str(varname)] = v return options def _get_pkg_options( - self, predefined_options: dict, not_options: Optional[list] = None + self, + predefined_options: dict[str, Any], + not_options: Optional[list[str]] = None, ): unfiltered_options = self._get_unfiltered_pkg_options( predefined_options, not_options=not_options diff --git a/imod/mf6/evt.py b/imod/mf6/evt.py index 0fe54ed67..fdb2b603c 100644 --- a/imod/mf6/evt.py +++ b/imod/mf6/evt.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Any, Optional import numpy as np @@ -250,7 +250,9 @@ def _validate(self, schemata, **kwargs): return errors def _get_pkg_options( - self, predefined_options: dict, not_options: Optional[list] = None + self, + predefined_options: dict[str, Any], + not_options: Optional[list[str]] = None, ): options = super()._get_pkg_options(predefined_options, not_options=not_options) # Add amount of segments diff --git a/imod/mf6/hfb.py b/imod/mf6/hfb.py index b806b2c5c..45513606b 100644 --- a/imod/mf6/hfb.py +++ b/imod/mf6/hfb.py @@ -5,7 +5,7 @@ from copy import deepcopy from enum import Enum from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Self, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Self, Tuple import cftime import numpy as np @@ -186,7 +186,7 @@ def to_connected_cells_dataset( idomain: GridDataArray, grid: xu.Ugrid2d, edge_index: np.ndarray, - edge_values: dict, + edge_values: dict[str, Any], ) -> xr.Dataset: """ Converts a cell edge grid with values defined on the edges to a dataset with the cell ids of the connected cells, @@ -416,7 +416,7 @@ def _prepare_barrier_dataset_for_mf6_adapter(dataset: xr.Dataset) -> xr.Dataset: def _snap_to_grid_and_aggregate( barrier_dataframe: GeoDataFrameType, grid2d: xu.Ugrid2d, vardict_agg: dict[str, str] -) -> tuple[xu.UgridDataset, npt.NDArray]: +) -> tuple[xu.UgridDataset, npt.NDArray[Any]]: """ Snap barrier dataframe to grid and aggregate multiple lines with a list of methods per variable. @@ -481,7 +481,7 @@ def __init__( geometry: "gpd.GeoDataFrame", print_input: bool = False, ) -> None: - dict_dataset = {"print_input": print_input} + dict_dataset: dict[str, Any] = {"print_input": print_input} super().__init__(dict_dataset) self.line_data = geometry @@ -859,7 +859,7 @@ def _get_variable_name(self) -> str: raise NotImplementedError @abc.abstractmethod - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: raise NotImplementedError def clip_box( @@ -1143,7 +1143,7 @@ def _get_barrier_type(self): def _get_variable_name(self) -> str: return "hydraulic_characteristic" - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: return [] def _compute_barrier_values( @@ -1220,7 +1220,7 @@ def _get_barrier_type(self): def _get_variable_name(self) -> str: return "hydraulic_characteristic" - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: return ["layer"] def _compute_barrier_values( @@ -1294,7 +1294,7 @@ def _get_barrier_type(self): def _get_variable_name(self) -> str: return "multiplier" - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: return [] def _compute_barrier_values( @@ -1373,7 +1373,7 @@ def _get_barrier_type(self): def _get_variable_name(self) -> str: return "multiplier" - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: return ["layer"] def _compute_barrier_values( @@ -1467,7 +1467,7 @@ def _get_barrier_type(self): def _get_variable_name(self) -> str: return "resistance" - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: return [] def _compute_barrier_values( @@ -1539,7 +1539,7 @@ def _get_barrier_type(self): def _get_variable_name(self) -> str: return "resistance" - def _get_vertical_variables(self) -> list: + def _get_vertical_variables(self) -> list[str]: return ["layer"] def _compute_barrier_values( diff --git a/imod/mf6/mf6_hfb_adapter.py b/imod/mf6/mf6_hfb_adapter.py index 3c9e35bcb..d922264fc 100644 --- a/imod/mf6/mf6_hfb_adapter.py +++ b/imod/mf6/mf6_hfb_adapter.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import Union +from typing import Any, Union import numpy as np import xarray as xr @@ -115,7 +115,7 @@ def __init__( print_input: Union[bool, xr.DataArray] = False, validate: Union[bool, xr.DataArray] = True, ): - dict_dataset = { + dict_dataset: dict[str, Any] = { "cell_id1": cell_id1, "cell_id2": cell_id2, "layer": layer, diff --git a/imod/mf6/model.py b/imod/mf6/model.py index f95c9a712..e895a29f1 100644 --- a/imod/mf6/model.py +++ b/imod/mf6/model.py @@ -90,7 +90,7 @@ def _create_boundary_condition_clipped_boundary( original_model: Modflow6Model, clipped_model: Modflow6Model, state_for_boundary: Optional[GridDataArray], - clip_box_args: tuple, + clip_box_args: tuple[Any, ...], ) -> Optional[StateType]: # Create temporary boundary condition for the original model boundary. This # is used later to see which boundaries can be ignored as they were already @@ -146,7 +146,7 @@ def _create_boundary_condition_clipped_boundary( return bc_constant_pkg -class Modflow6Model(collections.UserDict, IModel, abc.ABC): +class Modflow6Model(collections.UserDict[str, Package], IModel, abc.ABC): _mandatory_packages: tuple[str, ...] = () _init_schemata: SchemataDict = {} _model_id: Optional[str] = None @@ -165,7 +165,7 @@ def __init__(self): @standard_log_decorator() def _validate_options( - self, schemata: dict, **kwargs + self, schemata: dict[str, Any], **kwargs ) -> dict[str, list[ValidationError]]: return validate_schemata_dict(schemata, self._options, **kwargs) @@ -568,7 +568,7 @@ def _write( globaltimes=globaltimes, write_context=pkg_write_context, ) - elif issubclass(type(pkg), imod.mf6.HorizontalFlowBarrierBase): + elif isinstance(pkg, imod.mf6.HorizontalFlowBarrierBase): mf6_hfb_ls.append(pkg) else: pkg._write( @@ -656,7 +656,7 @@ def dump( if statusinfo.has_errors(): raise ValidationError(statusinfo.to_string()) - toml_content: dict = collections.defaultdict(dict) + toml_content: dict[str, Any] = collections.defaultdict(dict) for pkgname, pkg in self.items(): pkg_path = pkg.to_file( @@ -696,7 +696,7 @@ def from_file(cls, toml_path): return instance @property - def options(self) -> dict: + def options(self) -> dict[str, Any]: if self._options is None: raise ValueError("Model id has not been set") return self._options diff --git a/imod/mf6/out/__init__.py b/imod/mf6/out/__init__.py index ca74648c7..e5596cbf7 100644 --- a/imod/mf6/out/__init__.py +++ b/imod/mf6/out/__init__.py @@ -63,7 +63,7 @@ def open_cbc( "disu": disu.open_hds, } -_OPEN_CBC: Dict[str, Callable] = { +_OPEN_CBC: Dict[str, Callable[..., Any]] = { "dis": dis.open_cbc, "disv": disv.open_cbc, "disu": disu.open_cbc, @@ -76,7 +76,7 @@ def open_cbc( } -def _get_function(d: Dict[str, Callable], key: str) -> Callable: +def _get_function(d: Dict[str, Callable[..., Any]], key: str) -> Callable[..., Any]: try: func = d[key] except KeyError: diff --git a/imod/mf6/out/cbc.py b/imod/mf6/out/cbc.py index 5c48a83ea..65979428c 100644 --- a/imod/mf6/out/cbc.py +++ b/imod/mf6/out/cbc.py @@ -266,7 +266,7 @@ def read_imeth6_budgets_dense( dtype: np.dtype, pos: int, size: int, - shape: tuple, + shape: Tuple[int, ...], return_variable: str, indices: np.ndarray | None, ) -> FloatArray: diff --git a/imod/mf6/out/dis.py b/imod/mf6/out/dis.py index 66ab49ed3..8df3f752c 100644 --- a/imod/mf6/out/dis.py +++ b/imod/mf6/out/dis.py @@ -208,7 +208,7 @@ def open_dvs( def open_imeth1_budgets( - cbc_path: FilePath, grb_content: dict, header_list: List[cbc.Imeth1Header] + cbc_path: FilePath, grb_content: Dict[str, Any], header_list: List[cbc.Imeth1Header] ) -> xr.DataArray: """ Open the data for an imeth==1 budget section. Data is read lazily per @@ -252,7 +252,7 @@ def open_imeth1_budgets( def open_imeth6_budgets( cbc_path: FilePath, - grb_content: dict, + grb_content: Dict[str, Any], header_list: List[cbc.Imeth6Header], return_variable: str = "budget", indices: np.ndarray | None = None, @@ -374,7 +374,7 @@ def dis_indices( def dis_to_right_front_lower_indices( - grb_content: dict, + grb_content: Dict[str, Any], ) -> Tuple[xr.DataArray, xr.DataArray, xr.DataArray]: """ Infer the indices to extract right, front, and lower face flows from the @@ -442,7 +442,7 @@ def dis_extract_face_budgets( def dis_open_face_budgets( - cbc_path: FilePath, grb_content: dict, header_list: List[cbc.Imeth1Header] + cbc_path: FilePath, grb_content: Dict[str, Any], header_list: List[cbc.Imeth1Header] ) -> Tuple[xr.DataArray, xr.DataArray, xr.DataArray]: """ Open the flow-ja-face, and extract right, front, and lower face flows. diff --git a/imod/mf6/out/disu.py b/imod/mf6/out/disu.py index 6aad3d37b..1792385bb 100644 --- a/imod/mf6/out/disu.py +++ b/imod/mf6/out/disu.py @@ -88,13 +88,13 @@ def open_dvs( def open_imeth1_budgets( - cbc_path: FilePath, grb_content: dict, header_list: List[cbc.Imeth1Header] + cbc_path: FilePath, grb_content: Dict[str, Any], header_list: List[cbc.Imeth1Header] ) -> xr.DataArray: raise NotImplementedError def open_imeth6_budgets( - cbc_path: FilePath, grb_content: dict, header_list: List[cbc.Imeth6Header] + cbc_path: FilePath, grb_content: Dict[str, Any], header_list: List[cbc.Imeth6Header] ) -> xr.DataArray: raise NotImplementedError diff --git a/imod/mf6/out/disv.py b/imod/mf6/out/disv.py index d1befad55..5b47c7982 100644 --- a/imod/mf6/out/disv.py +++ b/imod/mf6/out/disv.py @@ -246,7 +246,7 @@ def open_dvs( def open_imeth1_budgets( - cbc_path: FilePath, grb_content: dict, header_list: List[cbc.Imeth1Header] + cbc_path: FilePath, grb_content: Dict[str, Any], header_list: List[cbc.Imeth1Header] ) -> xu.UgridDataArray: """ Open the data for an imeth==1 budget section. Data is read lazily per @@ -292,7 +292,7 @@ def open_imeth1_budgets( def open_imeth6_budgets( cbc_path: FilePath, - grb_content: dict, + grb_content: Dict[str, Any], header_list: List[cbc.Imeth6Header], return_variable: str = "budget", return_id: np.ndarray | None = None, @@ -476,7 +476,7 @@ def disv_indices( def disv_to_horizontal_lower_indices( - grb_content: dict, + grb_content: Dict[str, Any], ) -> Tuple[xr.DataArray, xr.DataArray, xr.DataArray, xr.DataArray]: grid = grb_content["grid"] lower, horizontal = disv_indices( @@ -552,7 +552,7 @@ def disv_extract_horizontal_budget( def disv_open_face_budgets( - cbc_path: FilePath, grb_content: dict, header_list: List[cbc.Imeth1Header] + cbc_path: FilePath, grb_content: Dict[str, Any], header_list: List[cbc.Imeth1Header] ) -> Tuple[xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray, xu.UgridDataArray]: horizontal_index, u, v, lower_index = disv_to_horizontal_lower_indices(grb_content) budgets = cbc.open_imeth1_budgets(cbc_path, header_list) diff --git a/imod/mf6/package.py b/imod/mf6/package.py index 79ebedb9d..54898acf7 100644 --- a/imod/mf6/package.py +++ b/imod/mf6/package.py @@ -322,7 +322,9 @@ def _write( self._write_text_griddata(path, da, dtype) @standard_log_decorator() - def _validate(self, schemata: dict, **kwargs) -> dict[str, list[ValidationError]]: + def _validate( + self, schemata: dict[str, Any], **kwargs + ) -> dict[str, list[ValidationError]]: ds = trim_time_dimension(self.dataset, **kwargs) return validate_schemata_dict(schemata, ds, **kwargs) @@ -612,7 +614,7 @@ def _get_non_grid_data(self, grid_names: list[str]) -> dict[str, Any]: return result def _call_func_on_grids( - self, func: Callable, dis: dict + self, func: Callable[..., Any], dis: dict[str, Any] ) -> dict[str, GridDataArray]: """ Call function on dictionary of grids and merge settings back into diff --git a/imod/mf6/simulation.py b/imod/mf6/simulation.py index 0e2013a8a..1c9b32801 100644 --- a/imod/mf6/simulation.py +++ b/imod/mf6/simulation.py @@ -63,7 +63,7 @@ merge_partitions, ) -OUTPUT_FUNC_MAPPING: dict[str, Callable] = { +OUTPUT_FUNC_MAPPING: dict[str, Callable[..., Any]] = { "head": open_hds, "concentration": open_conc, "budget-flow": open_cbc, @@ -92,7 +92,7 @@ def get_packages(simulation: Modflow6Simulation) -> dict[str, Package]: } -class Modflow6Simulation(collections.UserDict, ISimulation): +class Modflow6Simulation(collections.UserDict[str, Any], ISimulation): """ Modflow6Simulation is a class that represents a Modflow 6 simulation. It contains data on simulation timing, models that are present in the @@ -1028,7 +1028,7 @@ def dump( directory = pathlib.Path(directory) directory.mkdir(parents=True, exist_ok=True) - toml_content: DefaultDict[str, dict] = collections.defaultdict(dict) + toml_content: DefaultDict[str, dict[str, Any]] = collections.defaultdict(dict) # Dump version number version = get_version() toml_content["version"] = {"imod-python": version} @@ -1135,7 +1135,7 @@ def from_file(toml_path): return simulation - def get_exchange_relationships(self) -> list: + def get_exchange_relationships(self) -> list[Any]: """ Get exchange relationships in the simulation. @@ -1471,6 +1471,8 @@ def split( original_model_name_to_solution: dict[str, Solution] = {} for model_name, model in original_models.items(): solution_name = self.get_solution_name(model_name) + if solution_name is None: + raise ValueError(f"Could not find a solution for model '{model_name}'") solution = cast(Solution, new_simulation[solution_name]) original_model_name_to_solution[model_name] = solution @@ -1561,7 +1563,7 @@ def _set_flow_exchange_options(self) -> None: # collect some options that we will auto-set for exchange in self["split_exchanges"]: if isinstance(exchange, GWFGWF): - model_name_1 = exchange.dataset["model_name_1"].values[()] + model_name_1 = str(exchange.dataset["model_name_1"].values[()]) model_1 = self[model_name_1] exchange.set_options( save_flows=model_1["oc"].is_budget_output, @@ -1574,7 +1576,7 @@ def _set_flow_exchange_options(self) -> None: def _set_transport_exchange_options(self) -> None: for exchange in self["split_exchanges"]: if isinstance(exchange, GWTGWT): - model_name_1 = exchange.dataset["model_name_1"].values[()] + model_name_1 = str(exchange.dataset["model_name_1"].values[()]) model_1 = self[model_name_1] advection_key = model_1._get_pkgkey("adv") dispersion_key = model_1._get_pkgkey("dsp") diff --git a/imod/mf6/ssm.py b/imod/mf6/ssm.py index 084e39a4d..2737c0051 100644 --- a/imod/mf6/ssm.py +++ b/imod/mf6/ssm.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import Self +from typing import Any, Self import numpy as np @@ -11,7 +11,7 @@ from imod.schemata import DTypeSchema -def with_index_dim(array_like) -> tuple[str, list]: +def with_index_dim(array_like) -> tuple[str, list[Any]]: # At least1d will also extract the values if array_like is a DataArray. arr1d = np.atleast_1d(array_like) if arr1d.ndim > 1: diff --git a/imod/mf6/wel.py b/imod/mf6/wel.py index 47e9a0870..ab4a62fa8 100644 --- a/imod/mf6/wel.py +++ b/imod/mf6/wel.py @@ -5,7 +5,7 @@ import textwrap from collections.abc import Iterable from datetime import datetime -from typing import Any, Callable, Optional, Self, Sequence, Tuple, Union, cast +from typing import Any, Callable, Optional, Self, Sequence, Union, cast import cftime import numpy as np @@ -50,7 +50,7 @@ ABSTRACT_METH_ERROR_MSG = "Method in abstract base class called" -def _assign_dims(arg: Any) -> Tuple | xr.DataArray: +def _assign_dims(arg: Any) -> tuple[Any, ...] | xr.DataArray: is_da = isinstance(arg, xr.DataArray) if is_da and "time" in arg.coords: if arg.ndim != 2: @@ -113,7 +113,7 @@ def _df_groups_to_da_rates( def _prepare_well_rates_from_groups( - pkg_data: dict, + pkg_data: dict[str, Any], unique_well_groups: Sequence[pd.api.typing.DataFrameGroupBy], start_times: StressPeriodTimesType, ) -> xr.DataArray: @@ -141,7 +141,7 @@ def _process_timeseries( def _prepare_df_ipf_associated( - pkg_data: dict, all_well_times: list[datetime] + pkg_data: dict[str, Any], all_well_times: list[datetime] ) -> pd.DataFrame: """Prepare dataframe for an ipf with associated timeseries in a textfile.""" # Validate if associated wells are assigned multiple layers, factors, @@ -170,7 +170,7 @@ def _prepare_df_ipf_associated( def _prepare_df_ipf_unassociated( - pkg_data: dict, start_times: StressPeriodTimesType + pkg_data: dict[str, Any], start_times: StressPeriodTimesType ) -> pd.DataFrame: """Prepare dataframe for an ipf with no associated timeseries.""" is_steady_state = any(t is None for t in pkg_data["time"]) @@ -216,7 +216,9 @@ def _prepare_df_ipf_unassociated( def _unpack_package_data( - pkg_data: dict, start_times: StressPeriodTimesType, all_well_times: list[datetime] + pkg_data: dict[str, Any], + start_times: StressPeriodTimesType, + all_well_times: list[datetime], ) -> pd.DataFrame: """Unpack package data to dataframe""" has_associated = pkg_data["has_associated"] @@ -226,7 +228,7 @@ def _unpack_package_data( return _prepare_df_ipf_unassociated(pkg_data, start_times) -def get_all_imod5_prj_well_times(imod5_data: dict) -> list[datetime]: +def get_all_imod5_prj_well_times(imod5_data: dict[str, Any]) -> list[datetime]: """Get all times a well data is defined on in a prj file""" wel_keys = [key for key in imod5_data.keys() if key.startswith("wel")] wel_times_per_pkg = [imod5_data[wel_key]["time"] for wel_key in wel_keys] @@ -239,9 +241,9 @@ def get_all_imod5_prj_well_times(imod5_data: dict) -> list[datetime]: def derive_cellid_from_points( dst_grid: GridDataArray, - x: list, - y: list, - layer: list, + x: list[Any], + y: list[Any], + layer: list[Any], ) -> GridDataArray: """ Create DataArray with Modflow6 cell identifiers based on x, y coordinates @@ -583,7 +585,7 @@ def _assign_wells_to_layers( @classmethod def _validate_imod5_depth_information( - cls, key: str, pkg_data: dict, df: pd.DataFrame + cls, key: str, pkg_data: dict[str, Any], df: pd.DataFrame ) -> None: raise NotImplementedError(ABSTRACT_METH_ERROR_MSG) @@ -724,7 +726,7 @@ def from_imod5_data( def _cleanup( self, dis: StructuredDiscretization | VerticesDiscretization, - cleanup_func: Callable, + cleanup_func: Callable[..., Any], **cleanup_kwargs, ) -> None: # Work around mypy error, .data_vars cannot be used with xu.UgridDataset @@ -1079,7 +1081,9 @@ def _create_wells_df(self) -> pd.DataFrame: return wells_df @standard_log_decorator() - def _validate(self, schemata: dict, **kwargs) -> dict[str, list[ValidationError]]: + def _validate( + self, schemata: dict[str, Any], **kwargs + ) -> dict[str, list[ValidationError]]: kwargs["screen_top"] = self.dataset["screen_top"] return Package._validate(self, schemata, **kwargs) @@ -1119,7 +1123,7 @@ def _assign_wells_to_layers( @classmethod def _validate_imod5_depth_information( - cls, key: str, pkg_data: dict, df: pd.DataFrame + cls, key: str, pkg_data: dict[str, Any], df: pd.DataFrame ) -> None: if "layer" in pkg_data.keys() and (np.any(np.array(pkg_data["layer"]) != 0)): log_msg = textwrap.dedent( @@ -1416,7 +1420,7 @@ def _assign_wells_to_layers( @classmethod def _validate_imod5_depth_information( - cls, key: str, pkg_data: dict, df: pd.DataFrame + cls, key: str, pkg_data: dict[str, Any], df: pd.DataFrame ) -> None: if np.any(np.array(pkg_data["layer"]) == 0): log_msg = textwrap.dedent( diff --git a/imod/msw/fixed_format.py b/imod/msw/fixed_format.py index b5bddfcb2..d227b75d0 100644 --- a/imod/msw/fixed_format.py +++ b/imod/msw/fixed_format.py @@ -1,7 +1,7 @@ import warnings from dataclasses import dataclass from pathlib import Path -from typing import Optional, Union +from typing import Any, Optional, Union import numpy as np @@ -44,7 +44,7 @@ def fixed_format_parser( Dictionary with the VariableMetaData. Access this dictionary in a package by calling ._metadata_dict """ - results: dict[str, list] = {} + results: dict[str, list[Any]] = {} for key in metadata_dict: results[key] = [] diff --git a/imod/msw/initial_conditions.py b/imod/msw/initial_conditions.py index cba45f704..f28acb8b1 100644 --- a/imod/msw/initial_conditions.py +++ b/imod/msw/initial_conditions.py @@ -19,7 +19,7 @@ class InitialConditionsEquilibrium(MetaSwapPackage, IRegridPackage): _file_name = "init_svat.inp" _option = "Equilibrium" - _metadata_dict: dict = {} + _metadata_dict: dict[str, Any] = {} def __init__(self): super().__init__() @@ -73,7 +73,7 @@ class InitialConditionsPercolation(MetaSwapPackage, IRegridPackage): _file_name = "init_svat.inp" _option = "MeteoInputP" - _metadata_dict: dict = {} + _metadata_dict: dict[str, Any] = {} _regrid_method: DataclassType = EmptyRegridMethod() def __init__(self): @@ -99,7 +99,7 @@ class InitialConditionsSavedState(MetaSwapPackage): _file_name = "init_svat.inp" _option = "Saved_State" - _metadata_dict: dict = {} + _metadata_dict: dict[str, Any] = {} def __init__(self, saved_state): super().__init__() diff --git a/imod/msw/meteo_grid.py b/imod/msw/meteo_grid.py index fb7f4e8aa..d4c080412 100644 --- a/imod/msw/meteo_grid.py +++ b/imod/msw/meteo_grid.py @@ -1,7 +1,7 @@ import csv from pathlib import Path from shutil import copyfile -from typing import Optional, Union +from typing import Any, Optional, Union import numpy as np import pandas as pd @@ -75,7 +75,7 @@ def _write_free_format_file(self, path: Union[str, Path], dataframe: pd.DataFram ) def _compose_filename( - self, d: dict, directory: Path, pattern: Optional[str] = None + self, d: dict[str, Any], directory: Path, pattern: Optional[str] = None ): """ Construct a filename, following the iMOD conventions. diff --git a/imod/msw/model.py b/imod/msw/model.py index ce40d6b8d..ce12c6ff0 100644 --- a/imod/msw/model.py +++ b/imod/msw/model.py @@ -95,7 +95,7 @@ } -class Model(collections.UserDict): +class Model(collections.UserDict[str, Any]): def __setitem__(self, key, value): # TODO: Add packagecheck super().__setitem__(key, value) @@ -317,7 +317,7 @@ def _write_simulation_settings(self, directory: Path) -> None: # Add IdfMapping settings idf_key = self.get_pkgkey(IdfMapping) - idf_pkg = cast(IdfMapping, self[idf_key]) + idf_pkg = cast(IdfMapping, self[idf_key]) # type: ignore[index] simulation_settings.update(idf_pkg._get_output_settings()) simulation_settings["unsa_svat_path"] = self._render_unsaturated_database_path( @@ -359,7 +359,7 @@ def write( # Get index and svat grid_key = self.get_pkgkey(GridData) - grid_pkg = cast(GridData, self[grid_key]) + grid_pkg = cast(GridData, self[grid_key]) # type: ignore[index] index, svat = grid_pkg.generate_isactive_svat_arrays() # write package contents @@ -560,7 +560,7 @@ def split( # First, handle the grid data and determine the overlap. grid_key = self.get_pkgkey(GridData) - grid_pkg = cast(GridData, self[grid_key]) + grid_pkg = cast(GridData, self[grid_key]) # type: ignore[index] is_in_active_domain = {} for submodel_name, submodel in partitioned_submodels.items(): diff --git a/imod/msw/output_control.py b/imod/msw/output_control.py index c862d4eed..1106aedaf 100644 --- a/imod/msw/output_control.py +++ b/imod/msw/output_control.py @@ -53,7 +53,7 @@ class VariableOutputControl(MetaSwapPackage, IRegridPackage): """ _file_name = "sel_key_svat_per.inp" - _settings: dict = {} + _settings: dict[str, Any] = {} _metadata_dict = { "variable": VariableMetaData(10, None, None, str), "option": VariableMetaData(10, 0, 3, int), @@ -113,7 +113,7 @@ class TimeOutputControl(MetaSwapPackage, IRegridPackage): """ _file_name = "tiop_sim.inp" - _settings: dict = {} + _settings: dict[str, Any] = {} _metadata_dict = { "time_since_start_year": VariableMetaData(15, 0.0, 366.0, float), "year": VariableMetaData(6, 1, 9999, int), diff --git a/imod/msw/pkgbase.py b/imod/msw/pkgbase.py index 442644fc2..a379910af 100644 --- a/imod/msw/pkgbase.py +++ b/imod/msw/pkgbase.py @@ -34,10 +34,10 @@ class MetaSwapPackage(abc.ABC): __slots__ = "_pkg_id" _file_name = "filename_not_set" _regrid_method: DataclassType = EmptyRegridMethod() - _with_subunit: tuple = () - _without_subunit: tuple = () - _to_fill: tuple = () - _metadata_dict: dict = {} + _with_subunit: tuple[Any, ...] = () + _without_subunit: tuple[Any, ...] = () + _to_fill: tuple[Any, ...] = () + _metadata_dict: dict[str, Any] = {} def __init__(self): self.dataset = xr.Dataset() diff --git a/imod/prepare/spatial.py b/imod/prepare/spatial.py index 96cee913d..a816cff12 100644 --- a/imod/prepare/spatial.py +++ b/imod/prepare/spatial.py @@ -1,5 +1,5 @@ import pathlib -from typing import TYPE_CHECKING, Callable, Iterable, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Iterable, Optional, Union, cast import dask import numba @@ -233,9 +233,9 @@ def rasterize( # shapes must be an iterable try: iter(shapes) - iterable_shapes: Iterable = shapes + iterable_shapes: Iterable[Any] = shapes except TypeError: - iterable_shapes: Iterable = (shapes,) # type: ignore[no-redef] + iterable_shapes: Iterable[Any] = (shapes,) # type: ignore[no-redef] if dtype is None: dtype = like.dtype @@ -856,7 +856,7 @@ def _zonal_aggregate_raster( column: str, resolution: float, raster: xr.DataArray, - method: Union[str, Callable], + method: Union[str, Callable[..., Any]], ) -> pd.DataFrame: """ * Rasterize the polygon at given `resolution` @@ -916,7 +916,7 @@ def _zonal_aggregate_polygons( column_b: str, resolution: float, like: xr.DataArray, - method: Union[str, Callable], + method: Union[str, Callable[..., Any]], ) -> pd.DataFrame: """ * Rasterize a, rasterize b for the same domain @@ -965,7 +965,7 @@ def zonal_aggregate_raster( column: str, raster: xr.DataArray, resolution: float, - method: Union[str, Callable], + method: Union[str, Callable[..., Any]], chunksize: int = 10_000, ) -> pd.DataFrame: """ @@ -1048,7 +1048,7 @@ def zonal_aggregate_polygons( column_b: str, like: xr.DataArray, resolution: float, - method: Union[str, Callable], + method: Union[str, Callable[..., Any]], chunksize: int = 10_000, ) -> pd.DataFrame: """ diff --git a/imod/prepare/wells.py b/imod/prepare/wells.py index 16cd0c378..e742aaa7d 100644 --- a/imod/prepare/wells.py +++ b/imod/prepare/wells.py @@ -170,7 +170,7 @@ def locate_wells( def validate_well_columnnames( - wells: pd.DataFrame, names: set = {"x", "y", "id"} + wells: pd.DataFrame, names: set[str] = {"x", "y", "id"} ) -> None: missing = names.difference(wells.columns) if missing: diff --git a/imod/select/points.py b/imod/select/points.py index b0322cac4..375c6f354 100644 --- a/imod/select/points.py +++ b/imod/select/points.py @@ -10,7 +10,9 @@ from imod.typing import GridDataArray -def get_unstructured_cell2d_from_xy(uda: xu.UgridDataArray, **points) -> npt.NDArray: +def get_unstructured_cell2d_from_xy( + uda: xu.UgridDataArray, **points +) -> npt.NDArray[np.intp]: # Unstructured grids always require to be tested both on x and y coordinates # to see if points are within bounds. for coord in ["x", "y"]: @@ -24,7 +26,7 @@ def get_unstructured_cell2d_from_xy(uda: xu.UgridDataArray, **points) -> npt.NDA return uda.ugrid.grid.locate_points(xy) -def _check_and_get_points_shape(points: dict) -> dict: +def _check_and_get_points_shape(points: dict[str, Any]) -> dict[str, Any]: """Check whether points have the right shape""" shapes = {} for coord, value in points.items(): @@ -39,13 +41,13 @@ def _check_and_get_points_shape(points: dict) -> dict: return shapes -def _check_point_shapes_consistency(shapes: dict): +def _check_point_shapes_consistency(shapes: dict[str, Any]) -> None: if not len(set(shapes.values())) == 1: msg = "\n".join([f"{coord}: {shape}" for coord, shape in shapes.items()]) raise ValueError(f"Shapes of coordinates do match each other:\n{msg}") -def _check_points(points: dict): +def _check_points(points: dict[str, Any]) -> None: """ Check whether the array with points has the right and consistent shape. """ @@ -54,7 +56,7 @@ def _check_points(points: dict): _check_point_shapes_consistency(shapes) -def _arr_like_points(points: dict, fill_value: Any) -> npt.NDArray: +def _arr_like_points(points: dict[str, Any], fill_value: Any) -> npt.NDArray[Any]: """ Return array with the same shape as the first array provided in points. """ @@ -120,7 +122,9 @@ def points_in_bounds(da: GridDataArray, **points) -> npt.NDArray[np.bool_]: return in_bounds -def check_points_in_bounds(da: GridDataArray, points: dict, out_of_bounds: str): +def check_points_in_bounds( + da: GridDataArray, points: dict[str, Any], out_of_bounds: str +): inside = points_in_bounds(da, **points) # Error handling msg = "Not all points are located within the bounds of the DataArray" diff --git a/imod/typing/grid.py b/imod/typing/grid.py index 89aa5d868..815a964b5 100644 --- a/imod/typing/grid.py +++ b/imod/typing/grid.py @@ -107,7 +107,7 @@ def _force_decreasing_y(structured_grid: xr.DataArray | xr.Dataset): return structured_grid -def _get_first_item(objects: Sequence): +def _get_first_item(objects: Sequence[Any]) -> Any: return next(iter(objects)) @@ -115,8 +115,8 @@ def _get_first_item(objects: Sequence): # isinstance testing def _type_dispatch_functions_on_grid_sequence( objects: Sequence[GridDataArray | GridDataset], - unstructured_func: Callable, - structured_func: Callable, + unstructured_func: Callable[..., Any], + structured_func: Callable[..., Any], *args, **kwargs, ) -> GridDataArray | GridDataset: @@ -144,8 +144,8 @@ def _type_dispatch_functions_on_grid_sequence( # to manual type testing def _type_dispatch_functions_on_dict( dict_of_objects: Mapping[str, GridDataArray | float | bool | int], - unstructured_func: Callable, - structured_func: Callable, + unstructured_func: Callable[..., Any], + structured_func: Callable[..., Any], *args, **kwargs, ): @@ -206,7 +206,9 @@ def concat( ) -def merge_unstructured_dataset(variables_to_merge: list[dict], *args, **kwargs): +def merge_unstructured_dataset( + variables_to_merge: list[dict[str, Any]], *args, **kwargs +): """ Work around xugrid issue https://github.com/Deltares/xugrid/issues/179 @@ -524,7 +526,7 @@ class GridCache: unique geometry hash. """ - def __init__(self, func: Callable, max_cache_size=5): + def __init__(self, func: Callable[..., Any], max_cache_size: int = 5): self.max_cache_size = max_cache_size self.grid_cache: dict[int, GridDataArray] = {} self.func = func diff --git a/imod/util/nested_dict.py b/imod/util/nested_dict.py index 901918a60..7be774b0c 100644 --- a/imod/util/nested_dict.py +++ b/imod/util/nested_dict.py @@ -1,9 +1,9 @@ import collections import functools -from typing import Any, Dict, List +from typing import Any -def initialize_nested_dict(depth: int) -> collections.defaultdict: +def initialize_nested_dict(depth: int) -> collections.defaultdict[Any, Any]: """ Initialize a nested dict with a fixed depth @@ -27,7 +27,7 @@ def initialize_nested_dict(depth: int) -> collections.defaultdict: if depth == 0 or depth == 1: return collections.defaultdict(dict) else: - d: functools.partial[collections.defaultdict] = functools.partial( + d: functools.partial[collections.defaultdict[Any, Any]] = functools.partial( collections.defaultdict, dict ) for _ in range(depth - 2): @@ -35,7 +35,9 @@ def initialize_nested_dict(depth: int) -> collections.defaultdict: return collections.defaultdict(d) -def set_nested(d: collections.defaultdict, keys: List[str], value: Any) -> None: +def set_nested( + d: collections.defaultdict[Any, Any], keys: list[str], value: Any +) -> None: """ Set in the deepest dict of a set of nested dictionaries, as created by the initialize_nested_dict function above. @@ -59,7 +61,7 @@ def set_nested(d: collections.defaultdict, keys: List[str], value: Any) -> None: set_nested(d[keys[0]], keys[1:], value) -def append_nested_dict(dict1: Dict, dict2: Dict) -> None: +def append_nested_dict(dict1: dict[Any, Any], dict2: dict[Any, Any]) -> None: """ Recursively walk through two dicts to append dict2 to dict1. @@ -89,7 +91,7 @@ def append_nested_dict(dict1: Dict, dict2: Dict) -> None: dict1[key] = val -def sorted_nested_dict(d: Dict) -> list: +def sorted_nested_dict(d: dict[Any, Any]) -> list[Any]: """ Sorts a variably nested dict (of dicts) by keys. diff --git a/imod/util/path.py b/imod/util/path.py index 160526231..23499b255 100644 --- a/imod/util/path.py +++ b/imod/util/path.py @@ -8,15 +8,13 @@ import pathlib import re import tempfile -from typing import Any, Dict, Optional +from typing import Any, Optional import cftime import numpy as np from imod.util.time import _compose_timestring, to_datetime -Pattern = re.Pattern - def _custom_pattern_to_regex_pattern(pattern: str): """ @@ -34,7 +32,7 @@ def _custom_pattern_to_regex_pattern(pattern: str): return re.compile(simple_regex) -def _groupdict_from_pattern(stem: str, pattern: Pattern) -> Dict: +def _groupdict_from_pattern(stem: str, pattern: re.Pattern[str]) -> dict[str, Any]: """ Use a compiled regex pattern to extract the variables from the stem. Returns empty dict if no match is found. @@ -46,9 +44,9 @@ def _groupdict_from_pattern(stem: str, pattern: Pattern) -> Dict: return match.groupdict() -def _groupdict(stem: str, pattern: Optional[str | Pattern]) -> Dict: +def _groupdict(stem: str, pattern: Optional[str | re.Pattern[str]]) -> dict[str, Any]: if pattern is not None: - if isinstance(pattern, Pattern): + if isinstance(pattern, re.Pattern): d = _groupdict_from_pattern(stem, pattern) else: re_pattern = _custom_pattern_to_regex_pattern(pattern) @@ -78,7 +76,7 @@ def _groupdict(stem: str, pattern: Optional[str | Pattern]) -> Dict: return d -def decompose(path, pattern: Optional[str] = None) -> Dict[str, Any]: +def decompose(path, pattern: Optional[str] = None) -> dict[str, Any]: r""" Parse a path, returning a dict of the parts, following the iMOD conventions. diff --git a/pyproject.toml b/pyproject.toml index 2839a5294..2910acb20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,10 +100,6 @@ packages = ["imod"] follow_imports = "normal" strict = true -# These shouldn't be too much additional work, but may be tricky to -# get passing if you use a lot of untyped libraries -disallow_any_generics = false - # These next few are various gradations of forcing use of type annotations disallow_untyped_calls = false disallow_incomplete_defs = false @@ -124,7 +120,7 @@ module = [ "imod.data.*", "imod.evaluate.*", "imod.formats.*", - "imod.schemata", + "imod.schemata.*", "imod.templates.*", "imod.tests.*", "imod.visualize.*",