diff --git a/.github/workflows/typing-check.yml b/.github/workflows/typing-check.yml index 940fb9fc4..b84824688 100644 --- a/.github/workflows/typing-check.yml +++ b/.github/workflows/typing-check.yml @@ -30,7 +30,7 @@ jobs: # working-directory: ./${{ matrix.package-name }} run: | python -m pip install --upgrade pip - python -m pip install mypy ruff types-PyYAML types-pytz + python -m pip install mypy ruff types-PyYAML types-paramiko types-pytz - name: Typing check with mypy # working-directory: ./${{ matrix.package-name }} diff --git a/.gitignore b/.gitignore index 5f08477d3..6275d935c 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,9 @@ MANIFEST *.manifest *.spec +#env (do not port user-specs) +.env + # Installer logs pip-log.txt pip-delete-this-directory.txt @@ -165,4 +168,4 @@ cython_debug/ #.idea/ # VSCode -.vscode \ No newline at end of file +.vscode diff --git a/geos-trame/README.rst b/geos-trame/README.rst index a1c54deb2..6e519cdad 100644 --- a/geos-trame/README.rst +++ b/geos-trame/README.rst @@ -21,7 +21,21 @@ Build and install the Vue components cd vue-components npm i npm run build - cd - + cd .. + +then configure the .env + + sh configure.sh + +this will generate a `dotenv` environement file defining useful path to trame, + +.. code-block:: console + + cat .env + TEMPLATE_DIR=/path/to/geosPythonPackages/geos-trame/src/geos/trame/io/jinja_t + ASSETS_DIR=/path/to/geosPythonPackages/geos-trame/src/geos/trame/assets + +those will have lower precedence than local environement variables if defined Install the application diff --git a/geos-trame/configure.sh b/geos-trame/configure.sh new file mode 100644 index 000000000..f46d8ff9a --- /dev/null +++ b/geos-trame/configure.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +echo "TEMPLATE_DIR=${PWD}/src/geos/trame/app/io/jinja_t/" >> ${PWD}/src/geos/trame/assets/.env +echo "ASSETS_DIR=${PWD}/src/geos/trame/assets/" >> ${PWD}/src/geos/trame/assets/.env \ No newline at end of file diff --git a/geos-trame/pyproject.toml b/geos-trame/pyproject.toml index 73d7a028d..e751b49b0 100644 --- a/geos-trame/pyproject.toml +++ b/geos-trame/pyproject.toml @@ -31,18 +31,19 @@ keywords = [ dependencies = [ "setuptools", - "typing-extensions==4.12.2", "trame==3.6.5", - "trame-vuetify==2.7.1", + "trame-vuetify==3.1.0", "trame-code==1.0.1", "trame-server==3.2.3", - "trame-client==3.5.0", + "trame-client==3.11.2", "trame-simput==2.4.3", - "trame-vtk>=2.8.14", + "trame-vtk==2.10.0", "matplotlib==3.9.4", "trame-matplotlib==2.0.3", "trame-components==2.4.2", + "python-dotenv>=1.2.1", "mpld3<0.5.11", + "paramiko==4.0.0", "xsdata[cli]>=25.4", "xsdata-pydantic[lxml]==24.5", "pyvista==0.45.2", @@ -51,7 +52,7 @@ dependencies = [ "funcy==2.0", "pytz==2025.2", "typing_inspect==0.9.0", - "typing_extensions>=4.12", + "typing_extensions>=4.15.0", "PyYAML", ] @@ -73,7 +74,8 @@ test = [ "pixelmatch==0.3.0", "Pillow==11.0.0", "pytest-mypy==0.10.3", - "pytest-xprocess==1.0.2" + "pytest-xprocess==1.0.2", + "playwright==1.59.0" ] [project.readme] @@ -94,7 +96,7 @@ include-package-data = true # include = ['geos-trame*'] [tool.setuptools.package-data] -"*" = ["*.js", "*.css"] +"*" = ["*.js", "*.css","assets/*","*.jinja","*.json",".env"] [tool.pytest.ini_options] addopts = [ diff --git a/geos-trame/src/geos/trame/app/core.py b/geos-trame/src/geos/trame/app/core.py index 3baf7d387..7452782ec 100644 --- a/geos-trame/src/geos/trame/app/core.py +++ b/geos-trame/src/geos/trame/app/core.py @@ -1,7 +1,8 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. # SPDX-FileContributor: Lionel Untereiner, Jacques Franc - +# ignore context collapsing as it is clearer this way +# ruff: noqa: SIM117 from trame.ui.vuetify3 import VAppLayout from trame.decorators import TrameApp from trame.widgets import html, simput @@ -24,6 +25,9 @@ from geos.trame.app.ui.viewer.viewer import DeckViewer from geos.trame.app.components.alertHandler import AlertHandler +from geos.trame.app.io.simulation import Simulation +from geos.trame.app.ui.simulation_view import define_simulation_view + import sys @@ -38,10 +42,12 @@ def __init__( self, server: Server, file_name: str ) -> None: self.deckEditor: DeckEditor | None = None self.timelineEditor: TimelineEditor | None = None self.deckInspector: DeckInspector | None = None + self.simulationLauncher: Simulation | None = None self.server = server server.enable_module( module ) self.state.input_file = file_name + self.state.user_id = None # TODO handle hot_reload @@ -67,6 +73,9 @@ def __init__( self, server: Server, file_name: str ) -> None: self.region_viewer = RegionViewer() self.well_viewer = WellViewer( 5, 5 ) + ######## Simulation runner + self.simulation = Simulation( server=server ) + # Data loader self.data_loader = DataLoader( self.tree, self.region_viewer, self.well_viewer, trame_server=server ) @@ -177,24 +186,6 @@ def build_ui( self ) -> None: ): vuetify.VIcon( "mdi-content-save-outline" ) - with html.Div( - style= - "height: 100%; width: 300px; display: flex; align-items: center; justify-content: space-between;", - v_if=( "tab_idx == 1", ), - ): - vuetify.VBtn( - "Run", - style="z-index: 1;", - ) - vuetify.VBtn( - "Kill", - style="z-index: 1;", - ) - vuetify.VBtn( - "Clear", - style="z-index: 1;", - ) - # input file editor with vuetify.VCol( v_show=( "tab_idx == 0", ), classes="flex-grow-1 pa-0 ma-0" ): if self.tree.input_file is not None: @@ -208,3 +199,16 @@ def build_ui( self ) -> None: "The file " + self.state.input_file + " cannot be parsed.", file=sys.stderr, ) + + with vuetify.VCol( v_show=( "tab_idx == 1" ), classes="flex-grow-1 pa-0 ma-0" ): + if self.simulation is not None: + define_simulation_view( self.server ) + else: + self.ctrl.on_add_error( + "Error", + "The execution context " + self.state.exec_context + " is not consistent.", + ) + print( + "The execution context " + self.state.exec_context + " is not consistent.", + file=sys.stderr, + ) diff --git a/geos-trame/src/geos/trame/app/io/hpc_tools.py b/geos-trame/src/geos/trame/app/io/hpc_tools.py new file mode 100644 index 000000000..99ffec48a --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/hpc_tools.py @@ -0,0 +1,99 @@ +from geos.trame.app.io.ssh_tools import SimulationConstant + + +class SuggestDecomposition: + + def __init__( self, selected_cluster: SimulationConstant, n_unknowns: int, job_type: str = 'cpu' ) -> None: + """Initialize the decomposition hinter for HPC.""" + self.selected_cluster: SimulationConstant = selected_cluster + self.n_unknowns: int = n_unknowns + self.job_type: str = job_type #TODO should be an enum + self.sd: list[ dict ] = [] + + @staticmethod + def compute( n_unknowns: int, + memory_per_unknown_bytes: int, + node_memory_gb: int, + cores_per_node: int, + min_unknowns_per_rank: int = 10000, + strong_scaling: bool = True ) -> list[ dict ]: + """Suggests node/rank distribution for a cluster computation. + + Parameters: + - n_unknowns: total number of unknowns + - memory_per_unknown_bytes: estimated memory per unknown + - node_memory_gb: available memory per node + - cores_per_node: cores available per node + - min_unknowns_per_rank: minimum for efficiency + - strong_scaling: True if problem size is fixed + + Note: + - 10,000-100,000 unknowns per rank is often a sweet spot for many PDE solvers + - Use power-of-2 decompositions when possible (helps with communication patterns) + - For 3D problems, try to maintain cubic subdomains (minimizes surface-to-volume ratio, reducing communication) + - Don't oversubscribe: avoid using more ranks than provide parallel efficiency + + """ + # Memory constraint + node_memory_bytes = node_memory_gb * 1e9 + max_unknowns_per_node = int( 0.8 * node_memory_bytes / memory_per_unknown_bytes ) + + # Compute minimum nodes needed + min_nodes = max( 1, ( n_unknowns + max_unknowns_per_node - 1 ) // max_unknowns_per_node ) + + # Determine ranks per node + unknowns_per_node = n_unknowns // min_nodes + unknowns_per_rank = max( min_unknowns_per_rank, unknowns_per_node // cores_per_node ) + + # Calculate total ranks needed + n_ranks = max( 1, n_unknowns // unknowns_per_rank ) + + # Distribute across nodes + ranks_per_node = min( cores_per_node, ( n_ranks + min_nodes - 1 ) // min_nodes ) + n_nodes = ( n_ranks + ranks_per_node - 1 ) // ranks_per_node + + return [ + { + 'id': 1, + 'nodes': n_nodes, + 'ranks_per_node': ranks_per_node, + 'total_ranks': n_nodes * ranks_per_node, + 'unknowns_per_rank': n_unknowns // ( n_nodes * ranks_per_node ) + }, + { + 'id': 2, + 'nodes': n_nodes * 2, + 'ranks_per_node': ranks_per_node // 2, + 'total_ranks': n_nodes * ranks_per_node, + 'unknowns_per_rank': n_unknowns // ( n_nodes * ranks_per_node ) + }, + ] + + def get_sd( self ) -> list[ dict ]: + """Get the suggested decomposition popoulated.""" + if self.job_type == 'cpu' and self.selected_cluster: #make it an enum + self.sd = SuggestDecomposition.compute( self.n_unknowns, 64, self.selected_cluster.mem_per_node, + self.selected_cluster.cores_per_node ) + self.sd = [ { + **item, 'label': f"{self.selected_cluster.name} : {item['nodes']} x {item['ranks_per_node']}" + } for item in self.sd ] + else: + self.sd = [ + { + 'id': -1, + 'label': 'No: 0x0', + 'nodes': 0, + 'ranks_per_node': 0, + 'total_ranks': 0, + 'unknowns_per_rank': 0 + }, + ] + # elif job_type == 'gpu': + # selected_cluster['n_nodes']*selected_cluster['gpu']['per_node'] + + return self.sd + + # def to_list( self ) -> list[ str ]: + # """Pretty printer to list of string for display in UI.""" + # sd = self.get_sd() + # return [ f"{self.selected_cluster.name} : {sd_item['nodes']} x {sd_item['ranks_per_node']}" for sd_item in sd ] diff --git a/geos-trame/src/geos/trame/app/io/jinja_t/local_copyback.jinja b/geos-trame/src/geos/trame/app/io/jinja_t/local_copyback.jinja new file mode 100644 index 000000000..8038d5c01 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/jinja_t/local_copyback.jinja @@ -0,0 +1,15 @@ +#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBACTH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --error=job_GEOS_%j.err +#SBATCH --dependency=afterok:{{ dep_job_id }} + +srun tar cfz {{ dep_job_id }}.tgz Outputs_{{ dep_job_id }}/ log_{{ dep_job_id }}.out +srun mkdir -p {{ target_dl_path }} && mv -v {{ dep_job_id }}.tgz {{ target_dl_path }} \ No newline at end of file diff --git a/geos-trame/src/geos/trame/app/io/jinja_t/local_slurm.jinja b/geos-trame/src/geos/trame/app/io/jinja_t/local_slurm.jinja new file mode 100644 index 000000000..deb4ce7f8 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/jinja_t/local_slurm.jinja @@ -0,0 +1,27 @@ +#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBACTH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --error=job_GEOS_%j.err + +ulimit -s unlimited +ulimit -c unlimited + +module purge +module use {{ geos_module }} +module load {{ geos_load_list }} + +export HDF5_USE_FILE_LOCKING=FALSE +export OMP_NUM_THREADS=1 +export EXEC={{ geos_path }} + +srun --hint=nomultithread \ + -n {{ ntasks }} ${EXEC} \ + -o Outputs_${SLURM_JOBID} \ + -i {{ input_file | default('geosDeck.xml') }} | tee Outputs_${SLURM_JOBID}/log_${SLURM_JOBID}.out \ No newline at end of file diff --git a/geos-trame/src/geos/trame/app/io/jinja_t/p4_copyback.jinja b/geos-trame/src/geos/trame/app/io/jinja_t/p4_copyback.jinja new file mode 100644 index 000000000..417ac06e9 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/jinja_t/p4_copyback.jinja @@ -0,0 +1,15 @@ +#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBACTH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --err=job_GEOS_%j.err +#SBATCH --dependency=afterok:{{ dep_job_id }} + +srun tar cfz {{ dep_job_id }}.tgz Outputs_{{ dep_job_id }}/ log_{{ dep_job_id }}.out +srun mkdir -p {{ target_dl_path }} && mv -v {{ dep_job_id }}.tgz {{ target_dl_path }} diff --git a/geos-trame/src/geos/trame/app/io/jinja_t/p4_slurm.jinja b/geos-trame/src/geos/trame/app/io/jinja_t/p4_slurm.jinja new file mode 100644 index 000000000..dd15c075e --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/jinja_t/p4_slurm.jinja @@ -0,0 +1,27 @@ +#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBACTH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --error=job_GEOS_%j.err + +ulimit -s unlimited +ulimit -c unlimited + +module purge +module use {{ geos_module }} +module load {{ geos_load_list }} + +export HDF5_USE_FILE_LOCKING=FALSE +export OMP_NUM_THREADS=1 +export EXEC={{ geos_path }} + +srun --mpi=pmix_v3 --hint=nomultithread \ + -n {{ ntasks }} ${EXEC} \ + -o Outputs_${SLURM_JOBID} \ + -i {{ input_file | default('geosDeck.xml') }} | tee log_${SLURM_JOBID}.out diff --git a/geos-trame/src/geos/trame/app/io/jinja_t/pine_copyback.jinja b/geos-trame/src/geos/trame/app/io/jinja_t/pine_copyback.jinja new file mode 100644 index 000000000..e849c07d2 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/jinja_t/pine_copyback.jinja @@ -0,0 +1,15 @@ +#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBATCH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --error=job_GEOS_%j.err +#SBATCH --dependency=afterok:{{ dep_job_id }} + +srun tar cfz {{ dep_job_id }}.tgz Outputs_{{ dep_job_id }}/ log_{{ dep_job_id }}.out +srun mkdir -p {{ target_dl_path }} && mv -v {{ dep_job_id }}.tgz {{ target_dl_path }} diff --git a/geos-trame/src/geos/trame/app/io/jinja_t/pine_slurm.jinja b/geos-trame/src/geos/trame/app/io/jinja_t/pine_slurm.jinja new file mode 100644 index 000000000..0c47ae296 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/jinja_t/pine_slurm.jinja @@ -0,0 +1,28 @@ +#!/bin/sh +#SBATCH --job-name="{{ job_name }}" +#SBATCH --ntasks={{ ntasks }} +#SBATCH --partition={{ partition }} +#SBATCH --comment={{ comment_gr }} +#SBATCH --account={{ account }} +#SBATCH --nodes={{ nodes }} +#SBATCH --time={{ time | default('00:10:00') }} +#SBATCH --mem={{ mem }} +#SBATCH --output=job_GEOS_%j.out +#SBATCH --error=job_GEOS_%j.err + +ulimit -s unlimited +ulimit -c unlimited + +module purge +module use {{ geos_module }} +module load {{ geos_load_list }} + +export HDF5_USE_FILE_LOCKING=FALSE +export OMP_NUM_THREADS=1 +export EXEC={{ geos_path }} + +mkdir -p Outputs_${SLURM_JOBID} && touch log_${SLURM_JOBID}.out +mpirun -mca coll_hcoll_enable 0 -x UCX_RNDV_THRESH=131072 \ + -n {{ ntasks }} ${EXEC} \ + -o Outputs_${SLURM_JOBID} \ + -i {{ input_file | default('geosDeck.xml') }} | tee Outputs_${SLURM_JOBID}/log_${SLURM_JOBID}.out diff --git a/geos-trame/src/geos/trame/app/io/simulation.py b/geos-trame/src/geos/trame/app/io/simulation.py new file mode 100644 index 000000000..e4afac10a --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/simulation.py @@ -0,0 +1,311 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Jacques Franc + +from pathlib import Path +from enum import Enum, unique, auto +from typing import Optional, Any +from trame_server.core import Server + +from geos.trame.app.io.ssh_tools import Authentificator +from geos.trame.app.utils.async_file_watcher import AsyncPeriodicRunner + +from jinja2 import Environment, FileSystemLoader +import paramiko +import re +import os + + +@unique +class SimulationStatus( Enum ): + SCHEDULED = auto() + RUNNING = auto() + COMPLETING = auto() + COPY_BACK = auto() + DONE = auto() + NOT_RUN = auto() + UNKNOWN = auto() + + +@unique +class SlurmJobStatus( Enum ): + PENDING = "PEND" + RUNNING = "R" + COMPLETING = "CG" + COMPLETED = "CD" + SUSPENDED = "S" + UNKNOWN = "UNKNOWN" + + +class Simulation: + """Simulation component. + + Fills the UI with the screenshot as read from the simulation outputs folder and a graph with the time series + from the simulation. + Requires a simulation runner providing information on the output path of the simulation to monitor and ways to + trigger the simulation. + """ + + def __init__( self, server: Server, sim_info_dir: Optional[ Path ] = None ) -> None: + """Initialize the Simulation object with logging and sim triggers among other callbacks.""" + self._server = server + controller = server.controller + self._sim_info_dir = sim_info_dir + server.state.job_ids = [] + server.state.selected_cluster = None + server.state.nunknowns = 1 + + server.state.status_colors = { + "PENDING": "#4CAF50", #PD + "RUNNING": "#3F51B5", #R + "CANCELLED": "#FFC107", #CA + "COMPLETED": "#484B45", #CD + "FAILED": "#E53935", #F + } + self._job_status_watcher: Optional[ AsyncPeriodicRunner ] = None + self._job_status_watcher_period_ms = 2000 + + #define triggers + @controller.trigger( "run_try_login" ) + def run_try_login() -> None: + + # if server.state.key: + Authentificator.ssh_client = Authentificator._create_ssh_client( + Authentificator.get_cluster( server.state.selected_cluster_name ).host, #test + Authentificator.get_cluster( server.state.selected_cluster_name ).port, + server.state.login, + key=Authentificator.get_key( server.state.login, server.state.password, server.state.key_path, + server.state.selected_cluster_name ) ) + + if Authentificator.ssh_client: + server.state.access_granted = True + + @controller.trigger( "run_simulation" ) + def run_simulation() -> None: + + # if server.state.access_granted and server.state.sd and server.state.simulation_xml_filename: + if server.state.access_granted and server.state.simulation_xml_filename and server.state.decomposition: + if Authentificator.ssh_client: + # create remote path + try: + sftp = Authentificator.ssh_client.open_sftp() + sftp.stat( server.state.simulation_remote_path ) + except FileNotFoundError: + import posixpath + jpart = '/' + for part in server.state.simulation_remote_path.split( '/' )[ 1: ]: + try: + jpart = posixpath.join( jpart, part ) + sftp.stat( str( jpart ) ) # exists? + except FileNotFoundError: + sftp.mkdir( str( jpart ) ) + except PermissionError: + print( f"Permission error creating root folder at {jpart}" ) + raise + except: + print( f"Error creating root folder at {jpart}" ) + raise + + # create local path + os.makedirs( server.state.simulation_dl_path, exist_ok=True ) + + Authentificator._sftp_copy_tree( Authentificator.ssh_client, + Simulation.gen_tree( server.state.simulation_xml_filename ), + server.state.simulation_remote_path ) + + cluster_name = Authentificator.get_cluster( server.state.selected_cluster_name ).name + cluster_part = Authentificator.get_cluster( server.state.selected_cluster_name ).partition + cluster_trans_part = Authentificator.get_cluster( + server.state.selected_cluster_name ).partition_transfert + run_id: str = Simulation.render_and_run( + f'{cluster_name}_slurm.jinja', + 'job.slurm', + server, + job_name=server.state.simulation_job_name, + input_file=[ + item for item in server.state.simulation_xml_filename if item.get( 'type' ) == 'text/xml' + ][ 0 ].get( 'name' ), + nodes=server.state.decomposition[ 'nodes' ], + ntasks=server.state.decomposition[ 'nodes' ] * server.state.decomposition[ 'ranks_per_node' ], + geos_module=Authentificator.get_cluster( server.state.selected_cluster_name ).geos_module, + geos_load_list=" ".join( + Authentificator.get_cluster( server.state.selected_cluster_name ).geos_load_list ), + geos_path=Authentificator.get_cluster( server.state.selected_cluster_name ).geos_path, + mem="0", + comment_gr=server.state.slurm_comment, + partition=cluster_part, + account=server.state.slurm_comment ) + + Simulation.render_and_run( f'{cluster_name}_copyback.jinja', + 'copyback.slurm', + server, + job_name=server.state.simulation_job_name, + input_file=[ + item for item in server.state.simulation_xml_filename + if item.get( 'type' ) == 'text/xml' + ][ 0 ].get( 'name' ), + nodes=1, + ntasks=1, + mem="0", + dep_job_id=run_id, + target_dl_path=server.state.simulation_dl_path, + comment_gr=server.state.slurm_comment, + partition=cluster_trans_part, + account=server.state.slurm_comment ) + + self._start_result_streams() + + else: + raise paramiko.SSHException + + @controller.trigger( "kill_all_simulations" ) + def kill_all_simulations() -> None: + # exec scancel jobid + for jobs in server.state.job_ids: + Authentificator.kill_job( jobs[ 'job_id' ] ) + + def __del__( self ) -> None: + """Clean up running streams on destruction.""" + self._stop_result_streams() + + def set_status_watcher_period_ms( self, period_ms: int ) -> None: + """Set the watcher period in ms.""" + self._job_status_watcher_period_ms = period_ms + if self._job_status_watcher: + self._job_status_watcher.set_period_ms( period_ms ) + + def _stop_result_streams( self ) -> None: + if self._job_status_watcher is not None: + self._job_status_watcher.stop() + + def _start_result_streams( self ) -> None: + self._stop_result_streams() + self._job_status_watcher = AsyncPeriodicRunner( self.check_jobs, period_ms=self._job_status_watcher_period_ms ) + + def check_jobs( self ) -> None: + """Check on running jobs and update their names and progresses.""" + if Authentificator.ssh_client: + jid = self._server.state.job_ids + for index, job in enumerate( jid ): + job_id = job[ 'job_id' ] + _, sout, _ = Authentificator._execute_remote_command( + Authentificator.ssh_client, f'sacct -j {job_id} -o JobID,JobName,State --noheader' ) + job_line = sout.strip().split( "\n" )[ -1 ] + + jid[ index ][ 'status' ] = job_line.split()[ 2 ] + jid[ index ][ 'name' ] = job_line.split()[ 1 ] + + if ( jid[ index ][ 'status' ] == 'RUNNING' ): + _, sout, _ = Authentificator._execute_remote_command( + Authentificator.ssh_client, + f"sacct -j {job_id} -o ElapsedRaw,TimelimitRaw --noheader --parsable2 | head -n 1 " ) + progress_line = sout.strip().split( "|" ) + jid[ index ][ 'slprogress' ] = str( + float( progress_line[ 0 ] ) / float( progress_line[ 1 ] ) / 60 * 100 ) + + # getthe completed status + pattern = re.compile( r'\((\d+(?:\.\d+)?)%\s*completed\)' ) + _, sout, _ = Authentificator._execute_remote_command( + Authentificator.ssh_client, + f"grep \"completed\" {self._server.state.simulation_remote_path}/job_GEOS_{job_id}.out | tail -1" + ) + m = pattern.search( sout.strip() ) + if m: + jid[ index ][ 'simprogress' ] = str( m.group( 1 ) ) + + print( + f"{job_line}-{job_id}\n job id:{jid[index]['job_id']}\n status:{jid[index]['status']}\n name:{jid[index]['name']} \n --- \n" + ) + self._server.state.job_ids = jid + self._server.state.dirty( "job_ids" ) + self._server.state.flush() + + return None + + @staticmethod + def render_and_run( template_name: str, dest_name: str, server: Server, **kwargs: Any ) -> str: + """Render the slurm template and run it. Return it job_id.""" + if server.state.access_granted and server.state.simulation_xml_filename: + template = Environment( + loader=FileSystemLoader( f'{os.getenv("TEMPLATE_DIR")}' ) ).get_template( template_name ) + rendered = template.render( kwargs ) + + if Authentificator.ssh_client: + #write slurm directly on remote + try: + sftp = Authentificator.ssh_client.open_sftp() + remote_path = Path( server.state.simulation_remote_path ) / Path( dest_name ) + with sftp.file( str( remote_path ), 'w' ) as f: + f.write( rendered ) + + except PermissionError as e: + print( f"Permission error: {e}" ) + except IOError as e: + print( f"Error accessing remote file or path: {e}" ) + except Exception as e: + print( f"An error occurred during SFTP: {e}" ) + + _, sout, _ = Authentificator._execute_remote_command( + Authentificator.ssh_client, f'cd {server.state.simulation_remote_path} && sbatch {dest_name}' ) + job_lines = sout.strip() + job_id = re.search( r"Submitted batch job (\d+)", job_lines ) + if job_id: + server.state.job_ids.append( { 'job_id': job_id.group( 1 ) } ) + return job_id.group( 1 ) + else: + return "-1" + else: + return "-1" + else: + return "-1" + + @staticmethod + def gen_tree( xml_filename: Any ) -> dict: + """Generate file tree to be copied on remote from files uploaded.""" + import re + xml_pattern = re.compile( r"\.xml$", re.IGNORECASE ) + mesh_pattern = re.compile( r"\.(vtu|vtm|pvtu|pvtm)$", re.IGNORECASE ) + table_pattern = re.compile( r"\.(txt|dat|csv|geos)$", re.IGNORECASE ) + xml_matches = [] + mesh_matches = [] + table_matches = [] + + pattern_file = r"[\w\-.]+\.(?:vtu|pvtu|dat|txt|xml|geos)\b" # all files + pattern_xml_path = r"\"(.*/)*([\w\-.]+\.(?:xml))\b" + pattern_mesh_path = r"\"(.*/)*([\w\-.]+\.(?:vtu|pvtu|vtm|pvtm))\b" + pattern_table_curly_path = r"((?:[\w\-/]+/)+)*([\w\-.]+\.(?:geos|csv|dat|txt))" + + for file in xml_filename: + if xml_pattern.search( file.get( "name", "" ) ): + xml_matches.append( file ) + elif mesh_pattern.search( file.get( "name", "" ) ): + mesh_matches.append( file ) + elif table_pattern.search( file.get( "name", "" ) ): + table_matches.append( file ) + + #assume the first XML is the main xml + xml_expected_file_matches = re.findall( pattern_file, xml_matches[ 0 ][ 'content' ].decode( "utf-8" ) ) + + #TODO all the needed files + test_assert = { item.get( "name" ) for item in xml_filename }.intersection( set( xml_expected_file_matches ) ) + assert test_assert + + decoded = re.sub( pattern_xml_path, r'"\2', xml_matches[ 0 ][ 'content' ].decode( "utf-8" ) ) + decoded = re.sub( pattern_mesh_path, r'"mesh/\2', decoded ) + decoded = re.sub( pattern_table_curly_path, r"tables/\2", decoded ) + + xml_matches[ 0 ][ 'content' ] = decoded.encode( "utf-8" ) + + FILE_TREE = { + 'root': '.', + "structure": { + "files": xml_matches, + "subfolders": { + "mesh": mesh_matches, + "tables": table_matches + } + } + } + + print( f"Generated FILE_TREE: {FILE_TREE}" ) + return FILE_TREE diff --git a/geos-trame/src/geos/trame/app/io/ssh_tools.py b/geos-trame/src/geos/trame/app/io/ssh_tools.py new file mode 100644 index 000000000..1be0749d7 --- /dev/null +++ b/geos-trame/src/geos/trame/app/io/ssh_tools.py @@ -0,0 +1,303 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Jacques Franc + +from typing import Optional +from pathlib import Path +import paramiko +import os +import json +from dataclasses import dataclass + + +@dataclass +class SimulationConstant: + name: str + host: str + partition: str + partition_transfert: str + port: int + geos_path: str + geos_module: str + geos_load_list: list + remote_home_base: str # for ssh key + simulation_default_filename: str + simulation_remote_path: str + simulation_dl_default_path: str + simulation_information_default_path: str + n_nodes: int + cores_per_node: int + mem_per_node: int + + +#If proxyJump are needed +# +# proxy_cmd = "ssh -W {host}:{port} proxyuser@bastion.example.com".format( +# host=ssh_host, port=ssh_port +# ) +# from paramiko import ProxyCommand +# sock = ProxyCommand(proxy_cmd) + +# client = paramiko.SSHClient() +# client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +# client.connect( +# hostname=ssh_host, +# port=ssh_port, +# username=username, +# key_filename=keyfile, +# sock=sock, # <— tunnel created by ProxyCommand +# ) + + +class Authentificator: + + ssh_client: Optional[ paramiko.SSHClient ] = None + + sim_constants: list = [] + + @staticmethod + def reload_simconstants() -> None: + """Reload the cluster configuration from cluster.json file.""" + Authentificator.sim_constants = [ + SimulationConstant( **item ) + for item in json.load( open( f'{os.getenv("ASSETS_DIR")}/cluster.json', 'r' ) ) # noqa: SIM115 + ] + + @staticmethod + def get_cluster( name: str ) -> Optional[ SimulationConstant ]: + """Return the structured meta for cluster selected.""" + match = next( ( item for item in Authentificator.sim_constants if item.name == name ), None ) + return match + + @staticmethod + def _sftp_copy_tree( ssh_client: paramiko.SSHClient, file_tree: dict, remote_root: str ) -> None: + """Copy the file tree at remote root using ssh_client.""" + # Connect to remote server + sftp = ssh_client.open_sftp() + + Authentificator.dfs_tree( file_tree[ "structure" ], file_tree[ "root" ], sftp=sftp, remote_root=remote_root ) + + sftp.close() + + @staticmethod + def dfs_tree( node: list | dict, path: str, sftp: paramiko.SFTPClient, remote_root: str ) -> None: + """Create the tree represented by node at local path in remote pointed by sftp client at remote_root.""" + if path is None or remote_root is None: + return # type:ignore[unreachable] + + lp = Path( path ) + rp = Path( remote_root ) / lp + + if isinstance( node, list ): + for file in node: + print( f"copying {lp/Path(file.get('name'))} to {rp/Path(file.get('name'))}" ) + try: + with sftp.file( str( rp / Path( file.get( 'name' ) ) ), 'w' ) as f: + f.write( file.get( 'content' ) ) + except: + print( f"Error copying {lp/Path(file.get('name'))} to {rp/Path(file.get('name'))}" ) + raise + elif isinstance( node, dict ): + if "files" in node: + files = node[ 'files' ] + for file in files: + print( f"copying {lp/Path(file.get('name'))} to {rp/Path(file.get('name'))}" ) + try: + with sftp.file( str( rp / Path( file.get( 'name' ) ) ), 'w' ) as f: + f.write( file.get( 'content' ) ) + except: + print( f"Error copying {lp/Path(file.get('name'))} to {rp/Path(file.get('name'))}" ) + raise + if "subfolders" in node: + for subfolder, content in node[ "subfolders" ].items(): + try: + sftp.stat( str( rp / Path( subfolder ) ) ) + except FileNotFoundError: + try: + print( f"creating {rp/Path(subfolder)}" ) + sftp.mkdir( str( rp / Path( subfolder ) ) ) + except: + print( f"Error creating {rp/Path(subfolder)} on remote." ) + raise + Authentificator.dfs_tree( content, str( lp / Path( subfolder ) ), sftp, remote_root ) + + for folder, content in node.items(): + if folder not in [ "files", "subfolders" ]: + try: + sftp.stat( str( rp / Path( folder ) ) ) + except FileNotFoundError: + print( f"creating {rp/Path(folder)}" ) + try: + sftp.mkdir( str( rp / Path( folder ) ) ) + except: + print( f"Error creating {rp/Path(subfolder)} on remote." ) + raise + Authentificator.dfs_tree( content, str( lp / Path( folder ) ), sftp, remote_root ) + + @staticmethod + def kill_job( id: int ) -> None: + """Cancel job identified by id in slurm schedulder.""" + if Authentificator.ssh_client: + Authentificator._execute_remote_command( Authentificator.ssh_client, f"scancel {id}" ) + return None + + @staticmethod + def get_key( id: str, pword: str, key_path: str, cluster_name: str ) -> paramiko.RSAKey: + """Return the ssh key if found or create and dispatch one.""" + try: + PRIVATE_KEY = paramiko.RSAKey.from_private_key_file( key_path ) + return PRIVATE_KEY + except paramiko.SSHException as e: + print( f"Error loading private key: {e}\n" ) + raise paramiko.SSHException( f"Error loading private key: {e}\n" ) from e + except FileNotFoundError as e: + print( f"Private key not found: {e}\n Generating key at ... {key_path}" ) + PRIVATE_KEY = Authentificator.gen_key( key_path ) + temp_client = paramiko.SSHClient() + temp_client.set_missing_host_key_policy( paramiko.AutoAddPolicy() ) + + clusterByName = Authentificator.get_cluster( cluster_name ) + if clusterByName is None: + raise ValueError( f"Cluster '{cluster_name}' not found in configuration." ) from e + + host = clusterByName.host + port = clusterByName.port + temp_client.connect( host, port, username=id, password=pword, timeout=10 ) + remote_base = clusterByName.remote_home_base + Authentificator._transfer_file_sftp( temp_client, f"{key_path.split('/')[-1]}.pub", + f"{remote_base}/{id}/.ssh/{key_path.split('/')[-1]}.pub" ) + Authentificator._execute_remote_command( + temp_client, + f" cat {remote_base}/.ssh/{key_path.split('/')[-1]}.pub | tee -a {clusterByName.remote_home_base}/.ssh/authorized_keys" + ) + + return PRIVATE_KEY + + @staticmethod + def gen_key( key_path: str ) -> paramiko.RSAKey: + """Generate RSAKey for SSH protocol.""" + # home = os.environ.get( "HOME" ) + # file_path = f"{home}/.ssh/id_trame" + key = paramiko.RSAKey.generate( bits=4096 ) + key.write_private_key_file( key_path ) + + # Get public key in OpenSSH format + public_key = f"{key.get_name()} {key.get_base64()}" + with open( key_path + ".pub", "w" ) as pub_file: + pub_file.write( public_key ) + + suffix = key_path.split( '/' )[ -1 ] + print( f"SSH key pair generated: {suffix} (private), {suffix}.pub (public)" ) + + return key + + @staticmethod + def _create_ssh_client( host: str, + port: int, + username: str, + password: str | None = None, + key: paramiko.RSAKey | None = None ) -> paramiko.SSHClient | None: + """Initializes and returns an SSH client connection. + + Uses context manager for automatic cleanup. + """ + client = paramiko.SSHClient() + # Automatically adds the hostname and new host keys to the host files (~/.ssh/known_hosts) + client.set_missing_host_key_policy( paramiko.AutoAddPolicy() ) + + try: + print( f"Connecting to {host} using key-based authentication..." ) + client.connect( host, port, username, pkey=key, timeout=10 ) + + return client + except paramiko.AuthenticationException: + print( "Authentication failed. Check your credentials or key." ) + return None + except paramiko.SSHException as e: + print( f"Could not establish SSH connection: {e}" ) + return None + except Exception as e: + print( f"An unexpected error occurred: {e}" ) + return None + + @staticmethod + def _execute_remote_command( client: paramiko.SSHClient, command: str ) -> tuple[ int, str, str ]: + """Executes a single command on the remote server and prints the output.""" + if not client: + return ( -1, "", "" ) + + print( f"\n--- Executing Command: '{command}' ---" ) + try: + # Executes the command. stdin, stdout, and stderr are file-like objects. + # Ensure command ends with a newline character for some shell environments. + stdin, stdout, stderr = client.exec_command( command ) + + # Wait for the command to finish and read the output + exit_status = stdout.channel.recv_exit_status() + + # Print standard output + stdout_data = stdout.read().decode().strip() + if stdout_data: + print( "STDOUT:" ) + print( stdout_data ) + + # Print standard error (if any) + stderr_data = stderr.read().decode().strip() + if stderr_data: + print( "STDERR:" ) + print( stderr_data ) + + print( f"Command exited with status: {exit_status}" ) + return ( exit_status, stdout_data, stderr_data ) + + except PermissionError as e: + print( f"Permission error: {e}" ) + return ( -1, "", "" ) + except IOError as e: + print( f"Error accessing remote file or path: {e}" ) + return ( -1, "", "" ) + except Exception as e: + print( f"An error occurred during SFTP: {e}" ) + return ( -1, "", "" ) + + @staticmethod + def _transfer_file_sftp( client: paramiko.SSHClient, + local_path: str, + remote_path: str, + direction: str = "put" ) -> Optional[ bool ]: + """Transfers a file using SFTP (Secure File Transfer Protocol). + + Direction can be 'put' (upload) or 'get' (download). + """ + if not client: + return None + + print( f"\n--- Starting SFTP Transfer ({direction.upper()}) ---" ) + + try: + # Establish an SFTP connection session + sftp = client.open_sftp() + + if direction == "put": + print( f"Uploading '{local_path}' to '{remote_path}'..." ) + sftp.put( local_path, remote_path ) + print( "Upload complete." ) + elif direction == "get": + print( f"Downloading '{remote_path}' to '{local_path}'..." ) + sftp.get( remote_path, local_path ) + print( "Download complete." ) + else: + print( "Invalid transfer direction. Use 'put' or 'get'." ) + + sftp.close() + return True + + except FileNotFoundError: + print( f"Error: Local file '{local_path}' not found." ) + return False + except IOError as e: + print( f"Error accessing remote file or path: {e}" ) + return False + except Exception as e: + print( f"An error occurred during SFTP: {e}" ) + return False diff --git a/geos-trame/src/geos/trame/app/main.py b/geos-trame/src/geos/trame/app/main.py index 2ad3b293a..810ef46d5 100644 --- a/geos-trame/src/geos/trame/app/main.py +++ b/geos-trame/src/geos/trame/app/main.py @@ -1,13 +1,17 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. -# SPDX-FileContributor: Lionel Untereiner +# SPDX-FileContributor: Lionel Untereiner, Jacques Franc from pathlib import Path from typing import Any +from dotenv import load_dotenv +import os from trame.app import get_server # type: ignore from trame_server import Server +#do not override if existing from geos.trame.app.core import GeosTrame +from geos.trame.app.io.ssh_tools import Authentificator def main( server: Server = None, **kwargs: Any ) -> None: @@ -24,13 +28,20 @@ def main( server: Server = None, **kwargs: Any ) -> None: # parse args parser = server.cli - parser.add_argument( "-I", "--input", help="Input file (.xml)" ) + parser.add_argument( "-I", "--input", help="Input file (.xml)", required=True ) + parser.add_argument( "-e", "--env", help="dot_env file", required=False ) ( args, _unknown ) = parser.parse_known_args() - if args.input is None: - print( "Usage: \n\tgeos-trame -I /path/to/input/file" ) - return + if args.env: + assert load_dotenv( dotenv_path=Path( args.env ) ) + else: + assert load_dotenv( dotenv_path=Path( __file__ ).parent.parent / "assets/.env" ) + + Authentificator.reload_simconstants() + + print( f"TEMPLATE_DIR .. {os.getenv('TEMPLATE_DIR')}" ) + print( f"ASSETS_DIR .. {os.getenv('ASSETS_DIR')}" ) file_name = str( Path( args.input ).absolute() ) diff --git a/geos-trame/src/geos/trame/app/ui/plotting.py b/geos-trame/src/geos/trame/app/ui/plotting.py index 1ca2bbb9f..790203c27 100644 --- a/geos-trame/src/geos/trame/app/ui/plotting.py +++ b/geos-trame/src/geos/trame/app/ui/plotting.py @@ -57,7 +57,7 @@ def _figure_size( self ) -> dict: "dpi": dpi, } - def _permeability( self, **kwargs: Any ) -> Figure: + def _permeability( self, **kwargs: Any ) -> type[ Figure ]: # read data assert self.source.input_file is not None if self.source.input_file is None: diff --git a/geos-trame/src/geos/trame/app/ui/simulation_view.py b/geos-trame/src/geos/trame/app/ui/simulation_view.py new file mode 100644 index 000000000..627b47578 --- /dev/null +++ b/geos-trame/src/geos/trame/app/ui/simulation_view.py @@ -0,0 +1,410 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Jacques Franc +# ignore context collapsing as it is clearer this way +# ruff: noqa: SIM117 +from typing import Any + +from trame.widgets import html +from trame.widgets import vuetify3 as vuetify +from trame_server import Server + +from geos.trame.app.io.simulation import Authentificator +from geos.trame.app.io.hpc_tools import SuggestDecomposition + +#rough estimate of n unknowns would be better from GEOS's dry-run +# unknowns (oncell,onpoint) +# for now do not take into account wells as dep on the num of wells (neg vs matrix elmts) +# for now do not take into account frac as dep on the num of frac elmts (prob neg vs matrix elmts) +solvers_to_unknowns = { + "CompositionalMultiphaseFVM": ( 3, 0 ), + "CompositionalMultiphaseHybridFVM": ( 4, 0 ), + "CompositionalMultiphaseReservoirPoromechanics": ( 3, 3 ), + "CompositionalMultiphaseReservoirPoromechanicsConformingFractures": ( 3, 6 ), + "CompositionalMultiphaseWell": ( 3, 0 ), + "ElasticFirstOrderSEM": ( 0, 3 ), + "ElasticSEM": ( 0, 3 ), + "ImmiscibleMultiphaseFlow": ( 3, 0 ), + "LaplaceFEM": ( 0, 3 ), + "MultiphasePoromechanics": ( 3, 3 ), + "MultiphasePoromechanicsReservoir": ( 3, 3 ), #?? + "MultiphasePoromechanicsConformingFractures": ( 3, 6 ), + "SinglePhaseFVM": ( 2, 0 ), + "SinglePhaseHybridFVM": ( 3, 0 ), + "SinglePhasePoromechanics": ( 2, 3 ), + "SinglePhasePoromechanicsConformingFractures": ( 2, 3 ), + "SinglePhasePoromechanicsConformingFracturesALM": ( 2, 3 ), + "SinglePhaseWell": ( 2, 0 ), + "SolidMechanicsEmbeddedFractures": ( 0, 3 ), + "SolidMechanicsAugmentedLagrangianContact": ( 0, 3 ), + "SolidMechanicsLagrangeContact": ( 0, 3 ), + "SolidMechanicsLagrangeContactBubbleStab": ( 0, 3 ), + "SolidMechanicsLagrangianFEM": ( 0, 3 ) +} + + +# helpers +def _what_solver( bcontent: dict ) -> tuple[ int, int ]: + from xml.etree.ElementTree import Element, fromstring + sim_xml: Element = fromstring( bcontent[ 'content' ] ) + solver = sim_xml.find( 'Solvers' ) + nunk: list[ tuple[ int, int ] ] = [ solvers_to_unknowns.get( elt.tag, ( 1, 0 ) ) + for elt in solver ] if solver else [ ( 0, 0 ) ] + return max( nunk ) + + +def _how_many_cells( bcontent: dict ) -> tuple[ int, int ]: + import vtk + name = bcontent[ 'name' ] + if name.endswith( ".vtp" ): + reader = vtk.vtkXMLPolyDataReader() + elif name.endswith( ".vtu" ): + reader = vtk.vtkXMLUnstructuredGridReader() + elif name.endswith( ".vtm" ): + reader = vtk.vtkXMLMultiBlockDataReader() + else: + raise ValueError( "Unsupported kind (use 'vtp', 'vtu', or 'vtm')." ) + + reader.SetReadFromInputString( 1 ) + reader.SetInputString( bcontent[ 'content' ] ) + reader.Update() + output = reader.GetOutput() + return ( output.GetNumberOfCells(), output.GetNumberOfPoints() ) + + +def _has_internalMesh( bcontent: dict ) -> bool: + from xml.etree.ElementTree import Element, fromstring + sim_xml: Element = fromstring( bcontent[ 'content' ] ) + return bool( sim_xml.find( 'Mesh/InternalMesh' ) is not None ) + + +def _what_internalMesh( bcontent: dict ) -> tuple[ int, int ]: + from xml.etree.ElementTree import Element, fromstring + import re + sim_xml: Element = fromstring( bcontent[ 'content' ] ) + + mesh = sim_xml.find( 'Mesh/InternalMesh' ) + + def _parse_sum( value: str | None ) -> int: + if value is None: + return 0 + return sum( int( el ) for el in re.findall( r'-?\d+(?:\.\d+)?', value ) ) + + if mesh is None: + nx = ny = nz = 0 + else: + nx = _parse_sum( mesh.get( 'nx' ) ) + ny = _parse_sum( mesh.get( 'ny' ) ) + nz = _parse_sum( mesh.get( 'nz' ) ) + + return ( nx * ny * nz, ( nx + 1 ) * ( ny + 1 ) * ( nz + 1 ) ) + + +#TODO a class from it +def define_simulation_view( server: Server ) -> None: + """Functional definition of UI elements.""" + + @server.state.change( "other_widget_selected_file" ) + def on_other_widget_file_ready( other_widget_selected_file: dict, **_: Any ) -> None: + if not other_widget_selected_file: + return + + current = list( server.state.simulation_xml_filename ) + existing_names = { f.get( "name" ) for f in current } + + if other_widget_selected_file.get( "name" ) not in existing_names: + current.append( other_widget_selected_file ) + server.state.simulation_xml_filename = current + + @server.state.change( "selected_cluster_name" ) + def on_cluster_change( selected_cluster_name: str, **_: Any ) -> None: + print( f"selecting {selected_cluster_name}" ) + cluster = Authentificator.get_cluster( selected_cluster_name ) + if cluster is None: + print( f"Error: Cluster '{selected_cluster_name}' not found in configuration." ) + return + + server.state.decompositions = SuggestDecomposition( cluster, server.state.nunknowns ).get_sd() + + server.state.simulation_remote_path = cluster.simulation_remote_path + + server.state.simulation_dl_path = cluster.simulation_dl_default_path + + # @server.state.change( "decomposition" ) + # def on_decomposition_selected( decomposition: str, **_: Any ) -> None: + # = SuggestDecomposition( Authentificator.get_cluster( server.state.selected_cluster_name ), server.state.nunknowns ).get_sd() + # # if server.state.decomposition: + # except: + # server.state.sd = { 'nodes': 0, 'total_ranks': 0 } + + @server.state.change( "simulation_xml_temp" ) + def on_temp_change( simulation_xml_temp: list, **_: Any ) -> None: + current_list = server.state.simulation_xml_filename + new_list = current_list + simulation_xml_temp + + server.state.simulation_xml_filename = new_list + server.state.simulation_xml_temp = [] + + @server.state.change( "nunknowns" ) + def on_nunknowns_change( nunknowns: int, **_: Any ) -> None: + #re-gen list + if len( server.state.decompositions ) > 0: + server.state.decompositions = SuggestDecomposition( + Authentificator.get_cluster( server.state.selected_cluster_name ), nunknowns ).get_sd() + print( f'unknowns changed : {server.state.nunknowns} -> {nunknowns}' ) + server.state.nunknowns = nunknowns + + @server.state.change( "simulation_xml_filename" ) + def on_simfiles_change( simulation_xml_filename: list, **_: Any ) -> None: + import re + has_xml = [ file.get( "type", "" ) == 'text/xml' for file in simulation_xml_filename ] + + has_external_mesh = [ + bool( file.get( "name", "" ).endswith( ( ".vtu", ".vtm", ".vtp" ) ) ) for file in simulation_xml_filename + ] + + has_internal_mesh = False + for i, _ in enumerate( has_xml ): + if has_xml[ i ]: + has_internal_mesh = _has_internalMesh( simulation_xml_filename[ i ] ) + + if any( has_xml ): + uc = up = nc = np = 0 + # compute unknowns and cells only for xml files, if external mesh do not take into account internal mesh info even if present, if no external mesh try to take into account internal mesh info if present + # useful for decomposition suggestion + for i, _ in enumerate( has_xml ): + if has_external_mesh[ i ]: + nc, np = _how_many_cells( simulation_xml_filename[ i ] ) + elif has_xml[ i ]: + uc, up = _what_solver( simulation_xml_filename[ i ] ) + if has_internal_mesh: + nc, np = _what_internalMesh( simulation_xml_filename[ i ] ) + + if all( i is not None for i in ( uc, nc, up, np ) ): + server.state.nunknowns = uc * nc + up * np + + if any( has_xml ): + xml_pattern = re.compile( r"\.xml$", re.IGNORECASE ) + mesh_pattern = re.compile( r"\.(vtu|vtm|pvtu|pvtm)$", re.IGNORECASE ) + table_pattern = re.compile( r"\.(txt|dat|csv|geos)$", re.IGNORECASE ) + + xml_matches, mesh_matches, table_matches = [], [], [] + + pattern_file = r"[\w\-.]+\.(?:vtu|pvtu|dat|txt|xml|geos)\b" + + # Fix: use enumerate instead of .index() to handle duplicates safely + for i, file in enumerate( simulation_xml_filename ): + if not has_xml[ i ]: + continue + name = file.get( "name", "" ) + if xml_pattern.search( name ): + xml_matches.append( file ) + elif mesh_pattern.search( name ): + mesh_matches.append( file ) + elif table_pattern.search( name ): + table_matches.append( file ) + + if xml_matches: + already_have = { file.get( "name", "" ) for file in simulation_xml_filename } + required = set( re.findall( pattern_file, xml_matches[ 0 ][ 'content' ].decode( "utf-8" ) ) ) + required -= already_have + # Fix: store as list of dicts so the UI can use {{ file.name }} + server.state.simulation_xml_required = [ { "name": f } for f in sorted( required ) ] + else: + server.state.simulation_xml_required = [] + + server.state.is_valid_jobfiles = any( has_xml ) + server.state.all_req_files = any( has_xml ) and len( server.state.simulation_xml_required ) == 0 + + def kill_job( index_to_remove: int ) -> None: + # for now just check there is an xml + jid = list( server.state.job_ids ) + if 0 <= index_to_remove < len( jid ): + removed_id = jid[ index_to_remove ][ 'job_id' ] + Authentificator.kill_job( removed_id ) + del jid[ index_to_remove ] + + server.state.job_ids = jid + print( f"Job {removed_id} kill. Still running: {len(jid)}" ) + else: + print( f"Error: supress index does not exist ({index_to_remove})." ) + + def run_remove_jobfile( index_to_remove: int ) -> None: + current_files = list( server.state.simulation_xml_filename ) + if 0 <= index_to_remove < len( current_files ): + del current_files[ index_to_remove ] + + server.state.simulation_xml_filename = current_files + print( f"File at {index_to_remove} deleted. New files: {len(current_files)}" ) + else: + print( f"Erreur: Wrong deletion index ({index_to_remove})." ) + + with vuetify.VContainer(): + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + vuetify.VTextField( v_model=( + "login", + None, + ), + label="Login", + dense=True, + hide_details=True, + clearable=True, + prepend_icon="mdi-login" ) + with vuetify.VCol( cols=4 ): + vuetify.VTextField( v_model=( + "password", + None, + ), + label="Password", + type="password", + dense=True, + hide_details=True, + clearable=True, + prepend_icon="mdi-onepassword" ) + + # + server.state.access_granted = False + server.state.is_valid_jobfiles = False + server.state.simulation_xml_filename = [] + server.state.simulation_xml_required = [] + server.state.selected_cluster_names = [ cluster.name for cluster in Authentificator.sim_constants ] + # server.state.decompositions = [] + + # --------------------------- auth block -----------------------# + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=1 ): + vuetify.VSelect( label="Cluster", + items=( "selected_cluster_names", ), + v_model=( "selected_cluster_name", 'p4' ) ) + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=1 ): + vuetify.VSelect( label="Decomposition", + items=( "decompositions", [] ), + v_model=( "decomposition", None ), + item_title="label", + item_value="id", + return_object=True ) + + with vuetify.VRow(): + with vuetify.VCol( cols=8 ): + vuetify.VTextField( v_model=( + "key_path", + "/users/$USER/.ssh/id_trame", + ), + label="Path to ssh key", + dense=True, + hide_details=True, + clearable=True, + prepend_icon="mdi-key-chain-variant" ) + + # + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=1 ): + vuetify.VBtn( "Log in", click="trigger('run_try_login')", + disabled=( "access_granted", ) ) # type: ignore + # + vuetify.VDivider( vertical=True, thickness=5, classes="mx-4" ) + with vuetify.VCol( cols=1 ): + vuetify.VTextField( + v_model=( "slurm_comment", None ), + label="Comment to slurm", + dense=True, + hide_details=True, + clearable=True, + ) # type: ignore + + # --------------------------- simulation block -----------------------# + vuetify.VDivider( thickness=5, classes="my-4" ) + + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + vuetify.VFileUpload( + v_model=( "simulation_xml_temp", [] ), + title="Simulation file name", + density='comfortable', + hide_details=True, + # clearable=True, + multiple=True, + filter_by_type='.xml,.vtu,.vtm,.pvtu,.pvtm,.dat,.csv,.txt,.geos,.vtk', + # readonly=True, + disabled=( "!access_granted", ) ) + with vuetify.VCol( cols=4 ), vuetify.VList(): + with vuetify.VListItem( v_for=( "(file,i) in simulation_xml_filename" ), + key="i", + value="file", + prepend_icon="mdi-minus-circle-outline", + click=( run_remove_jobfile, "[i]" ) ): + vuetify.VListItemTitle( "{{ file.name }}" ) + vuetify.VListItemSubtitle( "{{ file.size ? (file.size / 1024).toFixed(1) + ' KB' : 'URL' }}" ) + vuetify.VDivider( thickness=2, classes="my-2" ) + + with vuetify.VListItem( + v_for=( "(file,i) in simulation_xml_required" ), + key="i", + value="file", + classes="bg-red-lighten-4 text-red-darken-4", + # base_color="red-lighten-4", + # style="background-color: rgb(var(--v-theme-error-lighten-4));", + prepend_icon="mdi-alert-circle-outline" ): + vuetify.VListItemTitle( "{{ file.name }} (required)" ) + + with vuetify.VRow(), vuetify.VCol(): + vuetify.VTextField( v_model=( "simulation_remote_path", None ), + label="Path where to write files and launch code", + prepend_icon="mdi-upload", + dense=True, + hide_details=True, + clearable=True, + disabled=( "!access_granted", ) + # TODO callback validation of path + ) + + with vuetify.VRow(), vuetify.VCol(): + vuetify.VTextField( v_model=( "simulation_dl_path", None ), + label="Simulation download path", + dense=True, + clearable=True, + prepend_icon="mdi-download", + disabled=( "!access_granted", ) + # TODO callback validation of path + ) + + with vuetify.VRow(): + with vuetify.VCol( cols=4 ): + vuetify.VTextField( v_model=( "simulation_job_name", "geosJob" ), + label="Job Name", + dense=True, + hide_details=True, + clearable=True, + disabled=( "!access_granted", ) ) + + vuetify.VSpacer() + with vuetify.VCol( cols=1 ): + vuetify.VBtn( "Run", + click="trigger('run_simulation')", + disabled=( "!is_valid_jobfiles || !all_req_files", ), + classes="ml-auto" ), # type: ignore + + # ------------------------------- Status block ----------------------------- # + vuetify.VDivider( thickness=5, classes="my-4" ) + + with vuetify.VRow(): + vuetify.VSpacer() + with vuetify.VCol( cols=1 ): + vuetify.VBtn( "Kill All", click="trigger('kill_all_simulations')" ), # type: ignore + + color_expression = "status_colors[job_ids[i].status] || '#607D8B'" + with vuetify.VRow(), vuetify.VCol( cols=4 ), vuetify.VList(): + with vuetify.VListItem( v_for=( "(jobs,i) in job_ids" ), + key="i", + value="jobs", + base_color=( color_expression, ), + prepend_icon="mdi-minus-circle-outline", + click=( kill_job, "[i]" ) ): + vuetify.VListItemTitle( "{{ jobs.status }} -- {{ jobs.name }} -- {{ jobs.job_id }}" ) + vuetify.VProgressLinear( v_model=( "jobs.simprogress", "0" ), ) + vuetify.VProgressLinear( v_model=( "jobs.slprogress", "0" ), ) + + with vuetify.VRow( v_if="simulation_error" ): + html.Div( "An error occurred while running simulation :
{{simulation_error}}", style="color:red;" ) diff --git a/geos-trame/src/geos/trame/app/ui/viewer/boxViewer.py b/geos-trame/src/geos/trame/app/ui/viewer/boxViewer.py index e51f5035f..4b5310e15 100644 --- a/geos-trame/src/geos/trame/app/ui/viewer/boxViewer.py +++ b/geos-trame/src/geos/trame/app/ui/viewer/boxViewer.py @@ -4,6 +4,7 @@ import pyvista as pv from geos.trame.schema_generated.schema_mod import Box +from typing import Any import re @@ -102,7 +103,7 @@ def _compute_intersected_cell( self ) -> None: if len( saved_ids ) > 0: self._extracted_cells = self._mesh.extract_cells( saved_ids ) - def _check_cell_inside_box( self, cell: pv.Cell, box_bounds: list[ float ] ) -> bool: + def _check_cell_inside_box( self, cell: pv.Cell, box_bounds: Any ) -> bool: """Check if the cell is inside the box bounds. A cell is considered inside the box if his bounds are completely diff --git a/geos-trame/src/geos/trame/app/ui/viewer/viewer.py b/geos-trame/src/geos/trame/app/ui/viewer/viewer.py index 4ea3c705a..46ceddb38 100644 --- a/geos-trame/src/geos/trame/app/ui/viewer/viewer.py +++ b/geos-trame/src/geos/trame/app/ui/viewer/viewer.py @@ -401,8 +401,8 @@ def _update_box( self, active_block: Box, show_obj: bool ) -> None: return if self.box_engine is not None and active_block.name in self.box_engine: - box_polydata_actor: pv.Actor = self.box_engine[ active_block.name ].get_box_polydata_actor() - extracted_cell_actor: pv.Actor = self.box_engine[ active_block.name ].get_extracted_cells_actor() + box_polydata_actor: pv.BasePlotter = self.box_engine[ active_block.name ].get_box_polydata_actor() + extracted_cell_actor: pv.BasePlotter = self.box_engine[ active_block.name ].get_extracted_cells_actor() self.plotter.remove_actor( box_polydata_actor ) self.plotter.remove_actor( extracted_cell_actor ) del self.box_engine[ active_block.name ] diff --git a/geos-trame/src/geos/trame/app/utils/async_file_watcher.py b/geos-trame/src/geos/trame/app/utils/async_file_watcher.py new file mode 100644 index 000000000..f8960e580 --- /dev/null +++ b/geos-trame/src/geos/trame/app/utils/async_file_watcher.py @@ -0,0 +1,65 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright 2023-2024 TotalEnergies. +# SPDX-FileContributor: Jacques Franc + +import asyncio +from asyncio import CancelledError, ensure_future +from typing import Callable + +from trame_server.utils import asynchronous + + +class AsyncPeriodicRunner: + """While started, runs given callback at given period.""" + + def __init__( self, callback: Callable, period_ms: int = 100 ) -> None: + """Init the async watcher object.""" + self.last_m_time = None + self.callback = callback + self.period_ms = period_ms + self.task = None + self.start() + + def __del__( self ) -> None: + """Clean up async watch on destruction.""" + self.stop() + + def set_period_ms( self, period_ms: int ) -> None: + """Set the async watch period. + + :params:period_ms period in ms + """ + self.period_ms = period_ms + + def start( self ) -> None: + """Stop existing async watch and start a new stream.""" + self.stop() + self.task = asynchronous.create_task( self._runner() ) + + def stop( self ) -> None: + """Stop the async watch.""" + if not self.task: + return + + ensure_future( self._wait_for_cancel() ) # type:ignore[unreachable] + + async def _wait_for_cancel( self ) -> None: + """Cancel and await cancel error for the task. + + If cancel is done outside async, it may raise warnings as cancelled exception may be triggered outside async + loop. + """ + if not self.task or self.task.done() or self.task.cancelled(): # type:ignore[unreachable] + self.task = None + return + + try: # type:ignore[unreachable] + self.task.cancel() + await self.task + except CancelledError: + self.task = None + + async def _runner( self ) -> None: + while True: + self.callback() + await asyncio.sleep( self.period_ms / 1000.0 ) diff --git a/geos-trame/src/geos/trame/assets/cluster.json b/geos-trame/src/geos/trame/assets/cluster.json new file mode 100644 index 000000000..221dd11d1 --- /dev/null +++ b/geos-trame/src/geos/trame/assets/cluster.json @@ -0,0 +1,68 @@ +[ + { + "name": "p4", + "host": "p4log01", + "partition": "p4_general", + "partition_transfert": "p4_transfer", + "port": 22, + "geos_path": "/workrd/users/$USER/GEOS/build-spack-generated-debug/bin/geosx", + "geos_module": "/workrd/users/$USER/modulesRHEL88", + "geos_load_list": [ + "geos-daily-rhel88" + ], + "remote_home_base": "/users/$USER", + "simulation_default_filename": "geosDeck.xml", + "simulation_remote_path": "/workrd/users/$USER/Example", + "simulation_dl_default_path": "/users/$USER/Example", + "simulation_information_default_path": "/users/$USER/.trame-logs", + "n_nodes": 212, + "cores_per_node": 192, + "mem_per_node": 747 + }, + { + "name": "pine", + "host": "pine-1", + "partition": "pine", + "partition_transfert": "pine", + "port": 22, + "geos_path": "/shared/data1/Users/$USER/codes/GEOS-2025-11-03/build-pine-1.pine.cluster-linux-rocky9-zen4-gcc@11.4.1-release/bin/geosx", + "geos_module": "/apps/modules/modulefiles3", + "geos_load_list": [ + "genesis", + "common", + "proxy", + "slurm", + "gcc/11.4.1", + "openmpi-gcc/5.0.5", + "cmake/3.27.9" + ], + "remote_home_base": "/home/$USER", + "simulation_default_filename": "geosDeck.xml", + "simulation_remote_path": "/shared/data1/Users/$USER/Example", + "simulation_dl_default_path": "/shared/data1/Users/$USER/Example", + "simulation_information_default_path": "/home/$USER/.trame-logs", + "n_nodes": 48, + "cores_per_node": 64, + "mem_per_node": 768 + }, + { + "name": "local", + "host": "127.0.0.1", + "partition": "debug", + "partition_transfert": "debug", + "port": 22, + "geos_path": "/opt/GEOS/build-spack-generated-debug/bin/geosx", + "geos_module": "/workrd/users/$USER/geos-generated", + "geos_load_list": [ + "geos-toolchains" + ], + "remote_home_base": "/home/$USER", + "simulation_default_filename": "geosDeck.xml", + "simulation_remote_path": "/work/", + "simulation_dl_default_path": "/data/", + "simulation_information_default_path": "/home/.trame-logs", + "n_nodes": 1, + "cores_per_node": 8, + "mem_per_node": 32 + } +] \ No newline at end of file diff --git a/install_packages.sh b/install_packages.sh index 3ded06357..3f9a454a4 100755 --- a/install_packages.sh +++ b/install_packages.sh @@ -11,8 +11,10 @@ python -m pip install --upgrade ./mesh-doctor python -m pip install --upgrade ./pygeos-tools python -m pip install --upgrade ./geos-pv #! trame install requires npm -# cd ./geos-trame/vue-components -# npm i -# npm run build -# cd ../../ -# python -m pip install ./geos-trame \ No newline at end of file +cd ./geos-trame/vue-components +npm i +npm run build +cd .. +sh configure.sh +cd .. +python -m pip install ./geos-trame