diff --git a/README.md b/README.md index e884a50..9391b20 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ The easiest way to get data as a pandas DataFrame: from pysus import sinan, sinasc, sim, sih, sia, pni, ibge, cnes, ciha # Download SINAN Dengue data for 2024 -df = sinan(disease="deng", year=2024) +df = sinan(disease="deng", year=2000) # Multiple years df = sinan(disease="deng", year=[2023, 2024]) @@ -81,7 +81,7 @@ async def main(): df = pysus.read_parquet(paths, mode="union").df() ``` -### Using the TUI +### Using the TUI (unstable/under testing) Launch the interactive text-based interface: diff --git a/pysus/api/_impl/databases.py b/pysus/api/_impl/databases.py index 615634d..fc8aeca 100644 --- a/pysus/api/_impl/databases.py +++ b/pysus/api/_impl/databases.py @@ -267,6 +267,7 @@ def _query(): "name": r.path.split("/")[-1], "path": r.path, "dataset": r.dataset.name if r.dataset else None, + "group": r.group.name if r.group else None, "year": r.year, "month": r.month, "state": r.state, diff --git a/pysus/api/ducklake/client.py b/pysus/api/ducklake/client.py index 35959b0..b94ef75 100644 --- a/pysus/api/ducklake/client.py +++ b/pysus/api/ducklake/client.py @@ -14,7 +14,42 @@ from sqlalchemy.pool import StaticPool from .catalog import CatalogDataset, CatalogFile, DatasetGroup -from .models import Dataset, File +from .models import DuckDataset, File + + +class CatalogDatasetAdapter: + def __init__(self, catalog_dataset: CatalogDataset, ducklake): + self.name = catalog_dataset.name + self.long_name = catalog_dataset.long_name or "" + self.description = catalog_dataset.description or "" + self.group_definitions: dict[str, str] = {} + self.ducklake = ducklake + self.client = ducklake + + @property + def content(self): + return self.ducklake.query(dataset=self.name.upper()) + + +class DatasetGroupAdapter: + def __init__(self, dataset_group: DatasetGroup, dataset): + self.name = dataset_group.name + self.long_name = dataset_group.long_name or "" + self.description = dataset_group.description or "" + self.dataset = dataset + + def __str__(self): + return self.name + + @property + async def files(self): + return [] + + async def _fetch_files(self): + return [] + + async def search(self, **kwargs): + return [] class DuckLakeCredentials(BaseModel): @@ -66,7 +101,7 @@ def _catalog_url(self) -> str: def _is_authenticated(self) -> bool: return self.credentials is not None - async def datasets(self, **kwargs) -> list[Dataset]: + async def datasets(self, **kwargs) -> list[DuckDataset]: if not self._Session: await self.connect() @@ -86,7 +121,7 @@ def _fetch(): return results records = await to_thread.run_sync(_fetch) - return [Dataset(record=rec, client=self) for rec in records] + return [DuckDataset(record=rec, client=self) for rec in records] async def login( self, @@ -300,6 +335,13 @@ def _query(): records = await to_thread.run_sync(_query) return [ - File(path=r.path, record=r, dataset=r.dataset, group=r.group) + File( + path=r.path, + record=r, + dataset=CatalogDatasetAdapter(r.dataset, self), + group=( + DatasetGroupAdapter(r.group, r.dataset) if r.group else None + ), + ) for r in records ] diff --git a/pysus/api/ducklake/models.py b/pysus/api/ducklake/models.py index 425cd6c..bb4e8da 100644 --- a/pysus/api/ducklake/models.py +++ b/pysus/api/ducklake/models.py @@ -2,6 +2,7 @@ from collections.abc import Callable from datetime import datetime from pathlib import Path +from typing import Any, Union import anyio from pydantic import Field @@ -19,6 +20,8 @@ class File(BaseRemoteFile): record: CatalogFile = Field(exclude=True) type: str = "remote" + dataset: Any + group: Any = None @property def basename(self) -> str: @@ -51,6 +54,7 @@ async def _download( ) -> Path: if not output: output = CACHEPATH / self.name + return await self.client._download_file( self, output, @@ -72,41 +76,7 @@ def _calculate(): return actual_hash == self.sha256 -class Group(BaseRemoteGroup): - record: DatasetGroup = Field(exclude=True) - dataset: "Dataset" = Field(exclude=True) - - @property - def name(self) -> str: - return self.record.name - - @property - def long_name(self) -> str: - return ( - self.record.group_metadata.long_name - if self.record.group_metadata - else self.name - ) - - @property - def description(self) -> str: - if self.record.group_metadata: - return self.record.group_metadata.description - return "" - - async def _fetch_files(self) -> list[BaseRemoteFile]: - return [ - File( - path=f.path, - record=f, - group=self, - dataset=self.dataset, - ) - for f in self.record.files - ] - - -class Dataset(BaseRemoteDataset): +class DuckDataset(BaseRemoteDataset): record: CatalogDataset = Field(exclude=True) client: BaseRemoteClient = Field(exclude=True) @@ -133,14 +103,12 @@ def description(self) -> str: else "" ) - async def _fetch_content( - self, - ) -> list[Group | File]: - items: list[Group | File] = [] + async def _fetch_content(self) -> list[Union["DuckGroup", File]]: + items: list[Union["DuckGroup", File]] = [] if self.record.groups: items.extend( - [Group(record=g, dataset=self) for g in self.record.groups], + [DuckGroup(record=g, dataset=self) for g in self.record.groups] ) if self.record.files: @@ -152,7 +120,42 @@ async def _fetch_content( dataset=self, ) for f in self.record.files - ], + ] ) return items + + +class DuckGroup(BaseRemoteGroup): + record: DatasetGroup = Field(exclude=True) + dataset: DuckDataset = Field(exclude=True) + + @property + def name(self) -> str: + return self.record.name + + @property + def long_name(self) -> str: + return ( + self.record.group_metadata.long_name + if self.record.group_metadata + else self.name + ) + + @property + def description(self) -> str: + if self.record.group_metadata: + return self.record.group_metadata.description + return "" + + async def _fetch_files(self) -> list[BaseRemoteFile]: + files: list[BaseRemoteFile] = [ + File( + path=f.path, + record=f, + group=self, + dataset=self.dataset, + ) + for f in self.record.files + ] + return files diff --git a/pysus/api/ftp/databases.py b/pysus/api/ftp/databases.py index 5df0278..5b042da 100644 --- a/pysus/api/ftp/databases.py +++ b/pysus/api/ftp/databases.py @@ -403,12 +403,55 @@ class SINAN(Dataset): ] group_definitions: dict[str, str] = { - "DENG": "Dengue", - "ZIKA": "Zika Vírus", + "ACBI": "Acidente de trabalho com material biológico", + "ACGR": "Acidente de trabalho", + "ANIM": "Acidente por Animais Peçonhentos", + "ANTR": "Atendimento Antirrabico", + "BOTU": "Botulismo", + "CANC": "Cancêr relacionado ao trabalho", + "CHAG": "Doença de Chagas Aguda", "CHIK": "Febre de Chikungunya", + "COLE": "Cólera", + "COQU": "Coqueluche", + "DENG": "Dengue", + "DERM": "Dermatoses ocupacionais", + "DIFT": "Difteria", + "ESQU": "Esquistossomose", + "EXAN": "Doença exantemáticas", + "FMAC": "Febre Maculosa", + "FTIF": "Febre Tifóide", "HANS": "Hanseníase", + "HANT": "Hantavirose", + "HEPA": "Hepatites Virais", + "IEXO": "Intoxicação Exógena", + "INFL": "Influenza Pandêmica", + "LEIV": "Leishmaniose Visceral", + "LEPT": "Leptospirose", + "LERD": "LER/Dort", + "LTAN": "Leishmaniose Tegumentar Americana", + "MALA": "Malária", + "MENI": "Meningite", + "MENT": "Transtornos mentais relacionados ao trabalho", + "NTRA": "Notificação de Tracoma", + "PAIR": "Perda auditiva por ruído relacionado ao trabalho", + "PEST": "Peste", + "PFAN": "Paralisia Flácida Aguda", + "PNEU": "Pneumoconioses realacionadas ao trabalho", + "RAIV": "Raiva", + "SDTA": "Surto Doenças Transmitidas por Alimentos", + "SIFA": "Sífilis Adquirida", + "SIFC": "Sífilis Congênita", + "SIFG": "Sífilis em Gestante", + "SRC": "Síndrome da Rubéola Congênia", + "TETA": "Tétano Acidental", + "TETN": "Tétano Neonatal", + "TOXC": "Toxoplasmose Congênita", + "TOXG": "Toxoplasmose Gestacional", + "TRAC": "Inquérito de Tracoma", "TUBE": "Tuberculose", - "ANIM": "Acidente por Animais Peçonhentos", + "VARC": "Varicela", + "VIOL": "Violência doméstica, sexual e/ou outras violências", + "ZIKA": "Zika Vírus", } @property diff --git a/pysus/api/models.py b/pysus/api/models.py index e9a08d3..aa7bbef 100644 --- a/pysus/api/models.py +++ b/pysus/api/models.py @@ -327,6 +327,7 @@ async def search(self, **kwargs) -> list[BaseRemoteFile]: class BaseRemoteDataset(BaseRemoteObject, SearchableMixin, ABC): client: BaseRemoteClient = Field(exclude=True) + group_definitions: dict[str, str] = {} _content: Sequence[BaseRemoteGroup | BaseRemoteFile] | None = PrivateAttr( default=None ) diff --git a/pysus/management/client.py b/pysus/management/client.py index 31ac16f..7bb29a4 100644 --- a/pysus/management/client.py +++ b/pysus/management/client.py @@ -1,20 +1,12 @@ import asyncio import os from collections.abc import Callable -from datetime import datetime from logging import error from pathlib import Path from anyio import to_thread from pysus.api.client import PySUS from pysus.api.dadosgov.models import File as APIFile -from pysus.api.ducklake.catalog import ( - CatalogDataset, - CatalogFile, - ColumnDefinition, - DatasetGroup, - Origin, -) from pysus.api.extensions import Parquet from pysus.api.ftp.models import File as FTPFile from pysus.api.models import BaseRemoteFile @@ -47,9 +39,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): ducklake = self.pysus._ducklake if ducklake: await ducklake._upload_catalog() - except Exception as e: # noqa - error(e) - pass finally: await self.pysus.__aexit__(exc_type, exc_val, exc_tb) @@ -67,30 +56,8 @@ async def upload( f"/{file.path.with_suffix('.parquet').name}" ) - with self.pysus._ducklake._Session() as session: - existing = ( - session.query(CatalogFile) - .filter(CatalogFile.origin_path == str(file.path)) - .first() - ) - - if not existing: - dataset = self._get_or_create_dataset(session, file) - existing = ( - session.query(CatalogFile) - .filter( - CatalogFile.path == s3_key, - CatalogFile.dataset_id == dataset.id, - ) - .first() - ) - - if existing and not self._should_upload(file, existing): - return - - parquet_ext = await self._download_with_retry(file, callback) - - await self._upload_to_s3(parquet_ext.path, s3_key) + dataset_id = None + group_id = None engine = self.pysus._ducklake._engine with engine.raw_connection() as conn: @@ -118,21 +85,52 @@ async def upload( f"'{file.dataset.long_name}', {origin_val})" ) - year_null = file.year is None - month_null = file.month is None - state_null = file.state is None - state_quoted = f"'{file.state}'" if not state_null else "NULL" + if file.group: + group_name = file.group.name + cursor.execute( + "SELECT id FROM pysus.dataset_groups " + f"WHERE name = '{group_name}' AND " + "dataset_id = {dataset_id}" + ) + row = cursor.fetchone() + if row: + group_id = row[0] + else: + cursor.execute( + "SELECT MAX(id) FROM pysus.dataset_groups", + ) + max_id = cursor.fetchone()[0] + group_id = (max_id or 0) + 1 + long_name = file.dataset.group_definitions.get( + group_name.upper(), group_name + ) + cursor.execute( + f"INSERT INTO pysus.dataset_groups " + f"(id, dataset_id, name, long_name) " + f"VALUES ({group_id}, {dataset_id}, " + f"'{group_name}', '{long_name}')" + ) + + group_val = "NULL" if group_id is None else str(group_id) cursor.execute( - f"SELECT id FROM pysus.files WHERE dataset_id = " - f"{dataset_id} AND year = {file.year} " # noqa: E501 - f"AND month {'IS NULL' if month_null else '= ' + str(file.month)} " # noqa: E501 - f"AND state {'IS NULL' if state_null else '= ' + state_quoted}" # noqa: E501 + f"SELECT id, group_id FROM pysus.files WHERE path = '{s3_key}'" # noqa ) row = cursor.fetchone() if row: - file_id = row[0] + file_id, db_group_id = row + + group_mismatch = db_group_id != group_id + should_upload = self._should_upload_raw( + cursor, + file_id, + file, + ) + + if not should_upload and not group_mismatch: + return + cursor.execute( f"DELETE FROM pysus.file_columns WHERE file_id = {file_id}" # noqa ) @@ -144,18 +142,22 @@ async def upload( max_id = cursor.fetchone()[0] file_id = (max_id or 0) + 1 - year_val = "NULL" if year_null else file.year - month_val = "NULL" if month_null else file.month - state_val = "NULL" if state_null else f"'{file.state}'" + parquet_ext = await self._download_with_retry(file, callback) + await self._upload_to_s3(parquet_ext.path, s3_key) + + year_val = "NULL" if file.year is None else file.year + month_val = "NULL" if file.month is None else file.month + state_val = "NULL" if file.state is None else f"'{file.state}'" cursor.execute( f"INSERT INTO pysus.files (id, dataset_id, " - f"path, size, rows, " + f"group_id, path, size, rows, " f"modified, origin_modified, origin_path, year, " f"month, state) " - f"VALUES ({file_id}, {dataset_id}, '{s3_key}', " - f"{parquet_ext.size}, {parquet_ext.rows}, " - f"CURRENT_TIMESTAMP, '{file.modify}', '{file.path}', " + f"VALUES ({file_id}, {dataset_id}, {group_val}, " + f"'{s3_key}', {parquet_ext.size}, " + f"{parquet_ext.rows}, CURRENT_TIMESTAMP, " + f"'{file.modify}', '{file.path}', " f"{year_val}, {month_val}, {state_val})" ) @@ -165,13 +167,17 @@ async def upload( for col in new_columns: cursor.execute( - "INSERT INTO pysus.file_columns " + f"INSERT INTO pysus.file_columns " f"(file_id, column_id) VALUES ({file_id}, {col})" ) conn.commit() - cursor.execute("CHECKPOINT") + + if parquet_ext.path.exists(): + parquet_ext.path.unlink() + await self.pysus._delete_record(str(parquet_ext.path)) + except Exception: # noqa try: conn.rollback() @@ -179,11 +185,6 @@ async def upload( pass raise - if parquet_ext.path.exists(): - parquet_ext.path.unlink() - - await self.pysus._delete_record(str(parquet_ext.path)) - async def _upload_to_s3( self, local_path: Path, @@ -232,159 +233,37 @@ async def _download_with_retry( f"attempts: {last_error}" ) from last_error - def _should_upload( + def _should_upload_raw( self, + cursor, + file_id: int, file: BaseRemoteFile, - catalog_file: CatalogFile | None = None, force: bool = False, ) -> bool: if force: - print(f"force=True, uploading {file.basename}") return True - if catalog_file is None: - print(f"no catalog record, uploading {file.basename}") + cursor.execute( + f"SELECT origin_modified FROM pysus.files WHERE id = {file_id}", + ) + row = cursor.fetchone() + if not row: return True - if catalog_file.origin_modified is None: - print(f"no origin_modified, uploading {file.basename}") + origin_modified = row[0] + if origin_modified is None: return True file_mod = getattr(file, "modify", None) if file_mod is None: - print(f"no file modify date, uploading {file.basename}") - return True - - if file_mod > catalog_file.origin_modified: - print(f"{catalog_file.origin_modified} newer than ({file_mod})") return True - print(f"skipping {file.basename} - already up to date") - return False - - def _get_or_create_dataset( - self, - session, - file: BaseRemoteFile, - ) -> CatalogDataset: - ds_name = file.dataset.name.lower() - ds = session.query(CatalogDataset).filter_by(name=ds_name).first() - if not ds: - is_ftp = file.client.name.lower() == "ftp" - origin = Origin.FTP if is_ftp else Origin.API - ds = CatalogDataset( - name=ds_name, long_name=file.dataset.long_name, origin=origin - ) - session.add(ds) - session.flush() - return ds - - def _get_or_create_group( - self, - session, - file: BaseRemoteFile, - dataset: CatalogDataset, - ) -> DatasetGroup | None: - if file.group is None: - return None - - group_name = file.group.name - group = ( - session.query(DatasetGroup) - .filter_by(name=group_name, dataset_id=dataset.id) - .first() - ) - - if not group: - group = DatasetGroup( - name=group_name, - dataset=dataset, - long_name=file.group.long_name, - ) - session.add(group) - session.flush() - return group - - def _get_or_create_file( - self, - session, - file: BaseRemoteFile, - dataset: CatalogDataset, - group: DatasetGroup | None = None, - ) -> CatalogFile: - query = session.query(CatalogFile).filter( - CatalogFile.dataset_id == dataset.id, - CatalogFile.group_id == (group.id if group else None), - CatalogFile.year == file.year, - CatalogFile.month == file.month, - CatalogFile.state == file.state, - ) - - cat_file = query.first() - - if not cat_file: - cat_file = CatalogFile( - dataset=dataset, - group=group, - path=f"pending/{file.basename}", - size=0, - rows=0, - modified=datetime.min, - origin_path=str(file.path), - year=file.year, - month=file.month, - state=file.state, - ) - session.add(cat_file) - session.flush() - - return cat_file - - def _get_or_create_columns( - self, session, dataset: CatalogDataset, file: Parquet - ) -> list[ColumnDefinition]: - existing_cols = {c.name: c for c in dataset.columns} - result = [] - - schema = file.schema - - type_map = { - "int64": "BIGINT", - "int32": "INTEGER", - "double": "DOUBLE", - "float": "FLOAT", - "bool": "BOOLEAN", - "timestamp[us]": "TIMESTAMP", - "string": "VARCHAR", - "binary": "BLOB", - } - - for col_name in schema.names: - field = schema.field(col_name) - arrow_type = str(field.type) - sql_type = type_map.get(arrow_type, "VARCHAR") - - if col_name not in existing_cols: - new_col = ColumnDefinition( - name=col_name, - dataset=dataset, - type=sql_type, - ) - session.add(new_col) - existing_cols[col_name] = new_col - else: - if existing_cols[col_name].type != sql_type: - existing_cols[col_name].type = sql_type - - result.append(existing_cols[col_name]) - - return result + return file_mod > origin_modified def _get_or_create_columns_raw( self, cursor, file: Parquet, dataset_id: int ) -> list[int]: schema = file.schema - type_map = { "int64": "BIGINT", "int32": "INTEGER", @@ -397,7 +276,6 @@ def _get_or_create_columns_raw( } result = [] - for col_name in schema.names: field = schema.field(col_name) arrow_type = str(field.type)