From 84737b3ccd4a15e98d0a0ac2b76f6c1657e1c53a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=A3=20Bida=20Vacaro?= Date: Fri, 22 May 2026 04:17:09 -0300 Subject: [PATCH] feat: include the 'client' field in the querying and prioritize FTP client --- pysus/api/_impl/databases.py | 67 ++++++++----------------- pysus/api/client.py | 2 + pysus/api/ducklake/client.py | 29 ++++++++++- pysus/management/client.py | 5 ++ pysus/tests/api/ducklake/test_client.py | 15 ++++-- pysus/tests/api/test_client.py | 3 ++ 6 files changed, 70 insertions(+), 51 deletions(-) diff --git a/pysus/api/_impl/databases.py b/pysus/api/_impl/databases.py index 9f9078d..684edce 100644 --- a/pysus/api/_impl/databases.py +++ b/pysus/api/_impl/databases.py @@ -24,11 +24,8 @@ from typing import Literal import pandas as pd -from anyio import to_thread from pysus.api.client import PySUS -from pysus.api.ducklake.catalog import CatalogDataset, CatalogFile, DatasetGroup from pysus.api.types import State -from sqlalchemy.orm import joinedload from tqdm import tqdm @@ -302,66 +299,44 @@ def list_files( "CNES", "CIHA", ], + client: Literal["FTP", "DadosGov"] | None = None, group: str | None = None, state: str | None = None, year: int | list[int] | None = None, month: int | list[int] | None = None, **kwargs, ) -> pd.DataFrame: - """List catalog files for a dataset, filtered by group/state/year/month.""" + """List catalog files filtered by client, group, state, year, and month.""" async def _list(): async with PySUS() as pysus: - ducklake = await pysus.get_ducklake() - if ducklake._Session is None: - await ducklake.connect() - - def _query(): - with ducklake._Session() as session: - q = session.query(CatalogFile).options( - joinedload(CatalogFile.dataset), - joinedload(CatalogFile.group), - ) - - if dataset: - q = q.join(CatalogDataset).filter( - CatalogDataset.name == dataset.lower() - ) + years = [year] if isinstance(year, int) else (year or [None]) + months = [month] if isinstance(month, int) else (month or [None]) - if group: - q = q.join(DatasetGroup).filter( - DatasetGroup.name == group + records = [] + for y in years: + for m in months: + records.extend( + await pysus.query( + client=client, + dataset=dataset, + group=group, + state=state, + year=y, + month=m, ) - - if state: - q = q.filter(CatalogFile.state == state.upper()) - - years = [year] if isinstance(year, int) else (year or []) - months = ( - [month] if isinstance(month, int) else (month or []) ) - if years: - q = q.filter(CatalogFile.year.in_(years)) - if months: - q = q.filter(CatalogFile.month.in_(months)) - - results = q.all() - session.expunge_all() - return results - - records = await to_thread.run_sync(_query) - return [ { - "name": r.path.split("/")[-1], - "path": r.path, + "name": str(r.path).split("/")[-1], + "path": str(r.path), "dataset": r.dataset.name if r.dataset else None, "group": r.group.name if r.group else None, - "year": r.year, - "month": r.month, - "state": r.state, - "modify": r.origin_modified, + "year": r.record.year, + "month": r.record.month, + "state": r.record.state, + "modify": r.record.origin_modified, } for r in records ] diff --git a/pysus/api/client.py b/pysus/api/client.py index 7dd06c9..1373755 100644 --- a/pysus/api/client.py +++ b/pysus/api/client.py @@ -414,6 +414,7 @@ def get_completed_remote_paths(self) -> set[str]: async def query( self, + client: Literal["DadosGov", "FTP"] | None = None, dataset: str | None = None, group: str | None = None, state: str | None = None, @@ -426,6 +427,7 @@ async def query( await self.get_ducklake() if self._ducklake is not None: return await self._ducklake.query( + client=client, dataset=dataset, group=group, state=state, diff --git a/pysus/api/ducklake/client.py b/pysus/api/ducklake/client.py index 1b78e0d..47ef426 100644 --- a/pysus/api/ducklake/client.py +++ b/pysus/api/ducklake/client.py @@ -6,7 +6,7 @@ from collections.abc import Callable from pathlib import Path -from typing import Any +from typing import Any, Literal import boto3 import httpx @@ -334,13 +334,14 @@ def _upload(): async def query( self, + client: Literal["FTP", "DadosGov"] | None = None, dataset: str | None = None, group: str | None = None, state: str | None = None, year: int | None = None, month: int | None = None, ) -> list[File]: - """Query catalog files by dataset, group, state, year, and/or month.""" + """Filter catalog files by client, dataset, group, state, year.""" if not self._Session: await self.connect() @@ -380,6 +381,30 @@ def _query(): return results records = await to_thread.run_sync(_query) + + if client: + prefix = f"public/data/{client.lower()}/" + records = [r for r in records if r.path.startswith(prefix)] + else: + ftp = [r for r in records if r.path.startswith("public/data/ftp/")] + dadosgov = [ + r for r in records if r.path.startswith("public/data/dadosgov/") + ] + ftp_keys = set() + for r in ftp: + stem = Path(r.path).stem + key = (r.dataset_id, r.year, r.month, stem) + ftp_keys.add(key) + + def has_ftp_match(r): + stem = Path(r.path).stem + if stem.endswith(".csv"): + stem = stem[:-4] + key = (r.dataset_id, r.year, r.month, stem) + return key in ftp_keys + + records = ftp + [r for r in dadosgov if not has_ftp_match(r)] + return [ File( path=r.path, diff --git a/pysus/management/client.py b/pysus/management/client.py index d06f705..2647c7c 100644 --- a/pysus/management/client.py +++ b/pysus/management/client.py @@ -73,6 +73,11 @@ async def upload( if row: dataset_id = row[0] + origin_val = "'FTP'" if is_ftp else "'API'" + cursor.execute( + f"UPDATE pysus.datasets SET origin = {origin_val} " + f"WHERE id = {dataset_id}" + ) else: cursor.execute("SELECT MAX(id) FROM pysus.datasets") max_id = cursor.fetchone()[0] diff --git a/pysus/tests/api/ducklake/test_client.py b/pysus/tests/api/ducklake/test_client.py index 1775db4..244c22f 100644 --- a/pysus/tests/api/ducklake/test_client.py +++ b/pysus/tests/api/ducklake/test_client.py @@ -42,20 +42,29 @@ async def test_is_authenticated_false_no_credentials(self): @pytest.mark.asyncio async def test_is_authenticated_with_credentials(self): + from unittest.mock import patch + client = DuckLake() - await client.login(access_key="key", secret_key="secret") + with patch.object(client, "_load_catalog"): + await client.login(access_key="key", secret_key="secret") assert client._is_authenticated is True @pytest.mark.asyncio async def test_login_sets_credentials(self): + from unittest.mock import patch + client = DuckLake() - await client.login(access_key="key", secret_key="secret") + with patch.object(client, "_load_catalog"): + await client.login(access_key="key", secret_key="secret") assert client.credentials is not None @pytest.mark.asyncio async def test_login_creates_s3_client(self): + from unittest.mock import patch + client = DuckLake() - await client.login(access_key="key", secret_key="secret") + with patch.object(client, "_load_catalog"): + await client.login(access_key="key", secret_key="secret") assert client._s3_client is not None client._s3_client = None diff --git a/pysus/tests/api/test_client.py b/pysus/tests/api/test_client.py index 3546cdf..3662737 100644 --- a/pysus/tests/api/test_client.py +++ b/pysus/tests/api/test_client.py @@ -201,6 +201,7 @@ async def test_query_with_dataset(self, test_db_path, tmp_path): result = await client.query(dataset="sinan") mock_ducklake.query.assert_called_once_with( + client=None, dataset="sinan", group=None, state=None, @@ -227,6 +228,7 @@ async def test_query_with_group(self, test_db_path): await client.query(dataset="sinan", group="DENGUE") mock_ducklake.query.assert_called_once_with( + client=None, dataset="sinan", group="DENGUE", state=None, @@ -258,6 +260,7 @@ async def test_query_with_all_params(self, test_db_path): ) mock_ducklake.query.assert_called_once_with( + client=None, dataset="sinasc", group="DC", state="SP",