diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3ff88ba..c0612e5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -29,6 +29,7 @@ repos: hooks: - id: flake8 args: [--max-line-length=80, --extend-ignore=E203] + exclude: ^docs/ additional_dependencies: [ 'flake8-blind-except', 'flake8-bugbear', diff --git a/pysus/api/dadosgov/client.py b/pysus/api/dadosgov/client.py index 4e6cbde..d56b57b 100644 --- a/pysus/api/dadosgov/client.py +++ b/pysus/api/dadosgov/client.py @@ -154,7 +154,7 @@ async def _download_file( self, file: BaseRemoteFile, output: pathlib.Path, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> pathlib.Path: """Download a remote file to a local path.""" if self._client is None: @@ -162,13 +162,22 @@ async def _download_file( "Client not connected. Call login(token=...) first.", ) - async with self._client.stream("GET", str(file.path)) as response: + url = ( + str(file.path) + .replace("https:/", "https://") + .replace("http:/", "http://") + ) + + async with self._client.stream("GET", url) as response: response.raise_for_status() + total = int(response.headers.get("Content-Length", 0)) + downloaded = 0 with open(output, "wb") as f: async for chunk in response.aiter_bytes(): f.write(chunk) + downloaded += len(chunk) if callback: - callback(len(chunk)) + callback(downloaded, total) return output @@ -181,9 +190,7 @@ class Recurso(BaseModel): title: str = Field(alias="titulo") url: str = Field(alias="link") api_size: int = Field(alias="tamanho") - last_modified: datetime | None = Field( - None, alias="dataUltimaAtualizacaoArquivo" - ) + last_modified: DateTime = Field(None, alias="dataUltimaAtualizacaoArquivo") file_name: str | None = Field(None, alias="nomeArquivo") async def get_size(self) -> int: diff --git a/pysus/api/dadosgov/databases.py b/pysus/api/dadosgov/databases.py index cc45659..192587a 100644 --- a/pysus/api/dadosgov/databases.py +++ b/pysus/api/dadosgov/databases.py @@ -1,9 +1,39 @@ """Pre-configured health database definitions accessible via dados.gov.br.""" +import re from typing import Any +from pysus.utils import zfill_year + from .models import Dataset +MONTHS: dict[str, int] = { + "jan": 1, + "fev": 2, + "mar": 3, + "abr": 4, + "mai": 5, + "jun": 6, + "jul": 7, + "ago": 8, + "set": 9, + "out": 10, + "nov": 11, + "dez": 12, +} + + +def _parse_year(val: str) -> int | None: + try: + y = int(val) + return y if 1970 <= y <= 2100 else None + except ValueError: + return None + + +def _skip(name: str) -> bool: + return name.startswith("get_") or name.lower().endswith(".pdf") + class CNES(Dataset): """Cadastro Nacional de Estabelecimentos de Saúde (CNES).""" @@ -32,8 +62,23 @@ def description(self) -> str: ) def formatter(self, filename: str) -> dict[str, Any]: - """Extract metadata from a filename (not yet implemented).""" - raise NotImplementedError() + """Parse a CNES filename and extract metadata.""" + try: + name = filename.strip() + if _skip(name): + return {"state": None, "year": None, "month": None} + + m = re.search(r"_(\d{2})-(\d{4})\.csv$", name) + if m: + return { + "state": None, + "year": _parse_year(m.group(2)), + "month": int(m.group(1)), + } + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} class PNI(Dataset): @@ -49,6 +94,18 @@ class PNI(Dataset): "9a25b796-80e3-444a-a4e7-405f5596d8ab", ] + _PNI_PREFIX = "doses-aplicadas-pelo-programa-de-nacional-de-imunizacoes-pni" + + group_aliases: dict[str, str] = { + _PNI_PREFIX: "DPNI", + f"{_PNI_PREFIX}-2020": "DPNI", + f"{_PNI_PREFIX}-2021": "DPNI", + f"dataset-{_PNI_PREFIX}_2022": "DPNI", + f"{_PNI_PREFIX}-2023": "DPNI", + f"{_PNI_PREFIX}-2025": "DPNI", + f"{_PNI_PREFIX}-2026": "DPNI", + } + @property def name(self) -> str: """Return the short name.""" @@ -64,8 +121,21 @@ def description(self) -> str: return "O PNI monitora a cobertura vacinal e doses aplicadas no Brasil." def formatter(self, filename: str) -> dict[str, Any]: - """Extract metadata from a filename (not yet implemented).""" - raise NotImplementedError() + """Parse a PNI vaccination filename into month and year.""" + try: + name = filename.strip().lower() + if _skip(name): + return {"state": None, "year": None, "month": None} + + m = re.match(r"vacinacao_(\w{3})_(\d{4})_csv\.zip", name) + if m: + month = MONTHS.get(m.group(1)) + year = _parse_year(m.group(2)) + return {"state": None, "year": year, "month": month} + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} class SIA(Dataset): @@ -92,8 +162,31 @@ def description(self) -> str: """ def formatter(self, filename: str) -> dict[str, Any]: - """Extract metadata from a filename (not yet implemented).""" - raise NotImplementedError() + """Parse an SIA filename into year.""" + try: + name = filename.strip().lower() + if _skip(name): + return {"state": None, "year": None, "month": None} + + m = re.search(r"_(\d{4})_\.csv$", name) + if m: + return { + "state": None, + "year": _parse_year(m.group(1)), + "month": None, + } + + m = re.search(r"_(\w{3})-out_(\d{4})_\.csv$", name) + if m: + return { + "state": None, + "year": _parse_year(m.group(2)), + "month": None, + } + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} class SINAN(Dataset): @@ -104,8 +197,21 @@ class SINAN(Dataset): "5699abe0-0510-4da8-b47d-209b3bb32b34", "4557ba96-7d52-4a56-bd6f-f99a5af09f77", "740ce8f4-7a5d-4351-aad4-7623f2490ada", + "cf044c1b-b966-4d0e-bab0-f3aa65897b7d", + "2d4997fb-cd11-4ce2-b217-09cd50e3151f", + "8a585222-4c2e-43b7-807d-59355ee79c48", + "527e8665-de64-4f81-b7c3-40b59c7d1d3c", ] + group_aliases: dict[str, str] = { + "arboviroses-dengue": "DENG", + "arboviroses-febre-de-chikungunya": "CHIK", + "arboviroses-zika-virus": "ZIKA", + "hanseniase": "HANS", + "dados-tuberculose": "TUBE", + "sifilis": "SIFA", + } + @property def name(self) -> str: """Return the short name.""" @@ -124,8 +230,31 @@ def description(self) -> str: """ def formatter(self, filename: str) -> dict[str, Any]: - """Extract metadata from a filename (not yet implemented).""" - raise NotImplementedError() + """Parse a SINAN filename into state and year.""" + try: + name = filename.strip().upper() + if _skip(name): + return {"state": None, "year": None, "month": None} + + m = re.match(r"(\w{4})(BR)(\d{2})\.CSV\.ZIP", name) + if m: + return { + "state": m.group(2), + "year": zfill_year(m.group(3)), + "month": None, + } + + m = re.match(r"MPX_(\d{4})_OPENDATASUS\.CSV\.ZIP", name) + if m: + return { + "state": None, + "year": _parse_year(m.group(1)), + "month": None, + } + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} class SIM(Dataset): @@ -135,6 +264,10 @@ class SIM(Dataset): "5f121f4d-47c6-428e-8ec6-e8ec56417172", ] + group_aliases: dict[str, str] = { + "sim-1979-2019": "DO", + } + @property def name(self) -> str: """Return the short name.""" @@ -152,8 +285,31 @@ def description(self) -> str: """ def formatter(self, filename: str) -> dict[str, Any]: - """Extract metadata from a filename (not yet implemented).""" - raise NotImplementedError() + """Parse a SIM filename into year.""" + try: + name = filename.strip() + if _skip(name): + return {"state": None, "year": None, "month": None} + + m = re.search(r"Mortalidade_Geral_(\d{4})_csv\.zip", name) + if m: + return { + "state": None, + "year": _parse_year(m.group(1)), + "month": None, + } + + m = re.match(r"DO(\d{2})OPEN", name) + if m: + return { + "state": None, + "year": zfill_year(m.group(1)), + "month": None, + } + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} class SINASC(Dataset): @@ -163,6 +319,10 @@ class SINASC(Dataset): "441cc6bd-684a-4afd-a88b-ba4734c9e83e", ] + group_aliases: dict[str, str] = { + "sistema-de-informacao-sobre-nascidos-vivos-sinasc-1996-a-20201": "DN", + } + @property def name(self) -> str: """Return the short name.""" @@ -181,8 +341,67 @@ def description(self) -> str: """ def formatter(self, filename: str) -> dict[str, Any]: - """Extract metadata from a filename (not yet implemented).""" - raise NotImplementedError() + """Parse a SINASC filename into year.""" + try: + name = filename.strip() + if _skip(name): + return {"state": None, "year": None, "month": None} + + m = re.search(r"SINASC_(\d{4})_csv\.zip", name) + if m: + return { + "state": None, + "year": _parse_year(m.group(1)), + "month": None, + } + + m = re.search(r"DNBR(\d{4})_csv\.zip", name) + if m: + return { + "state": "BR", + "year": _parse_year(m.group(1)), + "month": None, + } + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} + + +class COVID19(Dataset): + """Casos Confirmados de COVID-19.""" + + ids: list[str] = [ + "1ba1801e-aec0-4dba-ae2a-7732f0a0c9f7", + ] + + @property + def name(self) -> str: + """Return the short name.""" + return "COVID19" + + @property + def long_name(self) -> str: + """Return the human-readable name.""" + return "Casos Confirmados de COVID-19" + + @property + def description(self) -> str: + return "Dados anonimizados de casos confirmados de COVID-19." + + def formatter(self, filename: str) -> dict[str, Any]: + """Parse a COVID-19 filename.""" + try: + name = filename.strip().lower() + if _skip(name) or name.endswith(".xlsx"): + return {"state": None, "year": None, "month": None} + + if name.endswith(".csv"): + return {"state": None, "year": None, "month": None} + + return {"state": None, "year": None, "month": None} + except (IndexError, ValueError): + return {"state": None, "year": None, "month": None} AVAILABLE_DATABASES: list[type[Dataset]] = [ @@ -192,4 +411,5 @@ def formatter(self, filename: str) -> dict[str, Any]: SIM, SINAN, SINASC, + COVID19, ] diff --git a/pysus/api/dadosgov/models.py b/pysus/api/dadosgov/models.py index bc763f4..a582eb7 100644 --- a/pysus/api/dadosgov/models.py +++ b/pysus/api/dadosgov/models.py @@ -2,10 +2,11 @@ import asyncio import pathlib +import re from abc import abstractmethod from collections.abc import Callable from datetime import datetime -from typing import TYPE_CHECKING, Any +from typing import Any import httpx from dateparser import parse # type: ignore[import-untyped] @@ -14,10 +15,40 @@ from pysus.api.models import BaseRemoteDataset, BaseRemoteFile, BaseRemoteGroup from pysus.api.types import State -from .client import ConjuntoDados, Recurso +from .client import ConjuntoDados, DadosGov, Recurso -if TYPE_CHECKING: - from .client import DadosGov +_FORMAT_RE = re.compile(r"[._](csv|json|xml)(\.zip)?$", re.IGNORECASE) + + +def _dedup_entries( + entries: list[tuple[str, Any, dict]], +) -> list[tuple[str, Any, dict]]: + """If the same file exists in CSV, JSON and XML, keep only CSV.""" + grouped: dict[str, list[tuple[str, str, Any, dict]]] = {} + for filename, recurso, metadata in entries: + m = _FORMAT_RE.search(filename) + if m: + stem = filename[: m.start()] + fmt = m.group(1).lower() + grouped.setdefault(stem, []).append( + (fmt, filename, recurso, metadata) + ) + else: + grouped.setdefault(filename, []).append( + ("", filename, recurso, metadata) + ) + + result: list[tuple[str, Any, dict]] = [] + for _, items in grouped.items(): + formats = {fmt for fmt, _, _, _ in items} + if "csv" in formats: + for fmt, filename, recurso, metadata in items: + if fmt == "csv": + result.append((filename, recurso, metadata)) + else: + for _, filename, recurso, metadata in items: + result.append((filename, recurso, metadata)) + return result class File(BaseRemoteFile): @@ -32,7 +63,6 @@ def __init__(self, **data): metadata = data.pop("_metadata", {}) super().__init__(**data) self._metadata = metadata - self._path = self.record.url def __repr__(self): return self.basename @@ -113,7 +143,7 @@ async def fetch_metadata(self) -> None: async def _download( self, output: pathlib.Path | None = None, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> pathlib.Path: """Download the file to a local path.""" if not output: @@ -148,13 +178,9 @@ class Group(BaseRemoteGroup): """A group of files within a dataset.""" record: ConjuntoDados - _formatter: ( - Callable[ - [Recurso, "Group"], - dict[str, Any], - ] - | None - ) = PrivateAttr(default=None) + _formatter: Callable[[str], dict[str, Any]] | None = PrivateAttr( + default=None + ) def __init__( self, @@ -163,8 +189,9 @@ def __init__( formatter: Callable | None = None, ): """Initialize the Group with a dataset record and optional formatter.""" - super().__init__(dataset=dataset) - self.record = record + super().__init__( + record=record, dataset=dataset # type: ignore[call-arg] + ) self._formatter = formatter def __repr__(self): @@ -172,8 +199,10 @@ def __repr__(self): @property def name(self) -> str: - """Return the group slug name.""" - return self.record.slug + """Return the group name, resolved through dataset aliases.""" + slug = self.record.slug + aliases = getattr(self.dataset, "group_aliases", {}) + return aliases.get(slug, slug) @property def long_name(self) -> str: @@ -187,13 +216,30 @@ def description(self) -> str: async def _fetch_files(self) -> list[BaseRemoteFile]: """Build File objects from the underlying resources.""" - files: list[BaseRemoteFile] = [] + entries: list[tuple[str, Any, dict]] = [] for recurso in self.record.resources: - metadata = self._formatter(recurso, self) if self._formatter else {} + filename = ( + recurso.file_name or recurso.url.split("/")[-1].split("?")[0] + ) + if filename.lower().endswith(".pdf") or filename.startswith("get_"): + continue + metadata = {} + if self._formatter: + try: + metadata = self._formatter(filename) + except NotImplementedError: + pass + entries.append((filename, recurso, metadata)) + + entries = _dedup_entries(entries) + + files: list[BaseRemoteFile] = [] + for _, recurso, metadata in entries: file = File( record=recurso, dataset=self.dataset, group=self, + path=recurso.url, _metadata=metadata, ) files.append(file) @@ -205,6 +251,7 @@ class Dataset(BaseRemoteDataset): ids: list[str] = [] client: "DadosGov" + group_aliases: dict[str, str] = {} def __repr__(self): return self.name diff --git a/pysus/api/ducklake/client.py b/pysus/api/ducklake/client.py index 469da9b..1b78e0d 100644 --- a/pysus/api/ducklake/client.py +++ b/pysus/api/ducklake/client.py @@ -243,7 +243,7 @@ async def _download_file( self, file: BaseRemoteFile, output: Path, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> Path: """Download a single file from object storage to the local path.""" if not isinstance(file, File): @@ -253,11 +253,14 @@ async def _download_file( async with httpx.AsyncClient(follow_redirects=True) as client: async with client.stream("GET", url) as r: r.raise_for_status() + total = int(r.headers.get("Content-Length", 0)) + downloaded = 0 with open(output, "wb") as f: async for chunk in r.aiter_bytes(chunk_size=1024 * 1024): await to_thread.run_sync(f.write, chunk) + downloaded += len(chunk) if callback: - callback(len(chunk)) + callback(downloaded, total) return output async def _download_catalog(self, client: httpx.AsyncClient): diff --git a/pysus/api/ducklake/models.py b/pysus/api/ducklake/models.py index 527f2ca..baf0e66 100644 --- a/pysus/api/ducklake/models.py +++ b/pysus/api/ducklake/models.py @@ -64,7 +64,7 @@ def sha256(self) -> str | None: async def _download( self, output: Path | None = None, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> Path: """Download the file from object storage to the given output path.""" if not output: diff --git a/pysus/api/extensions.py b/pysus/api/extensions.py index 6acb9c8..0451b42 100644 --- a/pysus/api/extensions.py +++ b/pysus/api/extensions.py @@ -106,7 +106,19 @@ class CSV(BaseTabularFile): @property def columns(self) -> list[str]: """Return the column names from the CSV header row.""" - df = pd.read_csv(self.path, sep=",", nrows=0) + if self._encoding is not None: + enc = self._encoding + else: + with open(self.path, "rb") as f: + raw = f.read(1024 * 300) + raw_enc = chardet.detect(raw)["encoding"] + enc = ( + "iso-8859-1" + if raw_enc is None or raw_enc.lower() == "ascii" + else raw_enc + ) + self._encoding = enc + df = pd.read_csv(self.path, sep=",", nrows=0, encoding=enc) return df.columns.tolist() @property @@ -127,7 +139,10 @@ def detect(): return chardet.detect(f.read(1024 * 300)) result = await to_thread.run_sync(detect) - self._encoding = result["encoding"] or "utf-8" + enc = result["encoding"] + if enc is None or enc.lower() == "ascii": + enc = "iso-8859-1" + self._encoding = enc return self._encoding async def _get_sep(self) -> str: diff --git a/pysus/api/ftp/client.py b/pysus/api/ftp/client.py index 5f2f0c2..3c1b46f 100644 --- a/pysus/api/ftp/client.py +++ b/pysus/api/ftp/client.py @@ -13,9 +13,8 @@ from pysus.api.models import BaseRemoteClient, BaseRemoteFile if TYPE_CHECKING: - from pysus.api.types import State - from pysus.api.ftp.models import Dataset + from pysus.api.types import State class FTPGroupInfo(TypedDict): diff --git a/pysus/api/ftp/models.py b/pysus/api/ftp/models.py index 4efb034..06bf291 100644 --- a/pysus/api/ftp/models.py +++ b/pysus/api/ftp/models.py @@ -86,7 +86,7 @@ def state(self) -> State | None: async def _download( self, output: Path | None = None, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> Path: """Download this file to a local path, optionally reporting progress.""" if output is None: diff --git a/pysus/api/models.py b/pysus/api/models.py index e377139..5e883c3 100644 --- a/pysus/api/models.py +++ b/pysus/api/models.py @@ -201,6 +201,18 @@ async def to_parquet( chunk, ) + schema = table.schema + if any(pa.types.is_null(f.type) for f in schema): + new_fields = [ + ( + pa.field(f.name, pa.string(), nullable=True) + if pa.types.is_null(f.type) + else f + ) + for f in schema + ] + table = table.cast(pa.schema(new_fields)) + if writer is None: writer = await to_thread.run_sync( pq.ParquetWriter, output_path, table.schema @@ -307,7 +319,7 @@ def state(self) -> State | None: async def _download( self, output: Path | None = None, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> Path: """Download the file to *output* and return the local path. @@ -317,7 +329,7 @@ async def _download( async def download( self, output: str | Path | None = None, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> BaseLocalFile: """Download the remote file to a local cache or *output* path. @@ -481,6 +493,6 @@ async def _download_file( self, file: BaseRemoteFile, output: Path, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> Path: """Download a single *file* to *output* and return the local path.""" diff --git a/pysus/management/client.py b/pysus/management/client.py index 7bb29a4..d06f705 100644 --- a/pysus/management/client.py +++ b/pysus/management/client.py @@ -90,7 +90,7 @@ async def upload( cursor.execute( "SELECT id FROM pysus.dataset_groups " f"WHERE name = '{group_name}' AND " - "dataset_id = {dataset_id}" + f"dataset_id = {dataset_id}" ) row = cursor.fetchone() if row: diff --git a/pysus/tests/api/test_models.py b/pysus/tests/api/test_models.py index 6a14cfc..f559b96 100644 --- a/pysus/tests/api/test_models.py +++ b/pysus/tests/api/test_models.py @@ -42,7 +42,7 @@ def modify(self) -> datetime: async def _download( self, output: Path | None = None, - callback: Callable[[int], None] | None = None, + callback: Callable[[int, int], None] | None = None, ) -> Path: if not output: raise ValueError()