Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 47 additions & 12 deletions src/drs/commands/reflection.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,42 @@ async def create(client: DremioClient, path: str, rtype: str, display_fields: li
raise handle_api_error(exc) from exc


async def list_reflections(client: DremioClient, path: str) -> dict:
"""List reflections on a dataset via sys.project.reflections."""
parts = parse_path(path)
try:
entity = await client.get_catalog_by_path(parts)
except httpx.HTTPStatusError as exc:
raise handle_api_error(exc) from exc
dataset_id = entity["id"]
sql = f"SELECT * FROM sys.project.reflections WHERE dataset_id = '{dataset_id}'"
async def list_reflections(
client: DremioClient,
path: str | None = None,
*,
rtype: str | None = None,
status: str | None = None,
dataset_name: str | None = None,
limit: int | None = None,
) -> dict:
"""List reflections via sys.project.reflections.

When *path* is given, only reflections for that dataset are returned.
When omitted, all reflections in the project are returned.
Optional filters narrow results by *rtype*, *status*, or *dataset_name*.
An optional *limit* caps the number of rows returned.
"""
sql = "SELECT * FROM sys.project.reflections"
conditions: list[str] = []
if path is not None:
parts = parse_path(path)
try:
entity = await client.get_catalog_by_path(parts)
except httpx.HTTPStatusError as exc:
raise handle_api_error(exc) from exc
dataset_id = entity["id"]
conditions.append(f"dataset_id = '{dataset_id}'")
if rtype is not None:
conditions.append(f"type = '{rtype.upper()}'")
if status is not None:
conditions.append(f"status = '{status.upper()}'")
if dataset_name is not None:
conditions.append(f"dataset_name ILIKE '%{dataset_name}%'")
if conditions:
sql += " WHERE " + " AND ".join(conditions)
if limit is not None:
sql += f" LIMIT {limit}"
return await run_query(client, sql)


Expand Down Expand Up @@ -142,12 +169,20 @@ def cli_create(

@app.command("list")
def cli_list(
path: str = typer.Argument(help="Dot-separated dataset path"),
path: str = typer.Argument(None, help="Dot-separated dataset path (omit to list all reflections)"),
rtype: str = typer.Option(None, "--type", "-t", help="Filter by reflection type: raw or aggregation"),
status: str = typer.Option(None, "--status", "-s", help="Filter by status (e.g. CAN_ACCELERATE, FAILED, EXPIRED)"),
dataset_name: str = typer.Option(None, "--dataset-name", "-d", help="Filter by dataset name (substring match)"),
limit: int = typer.Option(None, "--limit", "-l", help="Maximum number of reflections to return"),
fmt: OutputFormat = typer.Option(OutputFormat.json, "--output", "-o", help="Output format"),
) -> None:
"""List all reflections defined on a dataset."""
"""List reflections. Shows all project reflections, or those for a specific dataset."""
client = _get_client()
_run_command(list_reflections(client, path), client, fmt)
_run_command(
list_reflections(client, path, rtype=rtype, status=status, dataset_name=dataset_name, limit=limit),
client,
fmt,
)


@app.command("get")
Expand Down
10 changes: 7 additions & 3 deletions src/drs/introspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,15 @@
"reflection.list": {
"group": "reflection",
"command": "list",
"description": "List all reflections defined on a dataset.",
"description": "List reflections. Shows all project reflections, or those for a specific dataset.",
"mechanism": "SQL",
"sql_template": "SELECT * FROM sys.project.reflections WHERE dataset_id = '{dataset_id}'",
"sql_template": "SELECT * FROM sys.project.reflections [WHERE ...] [LIMIT {limit}]",
"parameters": [
{"name": "path", "type": "string", "required": True, "positional": True},
{"name": "path", "type": "string", "required": False, "positional": True},
{"name": "type", "type": "string", "required": False, "flag": "--type/-t"},
{"name": "status", "type": "string", "required": False, "flag": "--status/-s"},
{"name": "dataset_name", "type": "string", "required": False, "flag": "--dataset-name/-d"},
{"name": "limit", "type": "integer", "required": False, "flag": "--limit/-l"},
{"name": "output", "type": "enum", "required": False, "default": "json", "enum": ["json", "csv", "pretty"]},
],
},
Expand Down
83 changes: 81 additions & 2 deletions tests/test_commands/test_reflection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,90 @@

from __future__ import annotations

from unittest.mock import AsyncMock
from unittest.mock import AsyncMock, patch

import pytest

from drs.commands.reflection import delete, get_reflection, refresh
from drs.commands.reflection import delete, get_reflection, list_reflections, refresh

QUERY_RESULT = {"job_id": "j1", "state": "COMPLETED", "rowCount": 2, "rows": [{"id": "r1"}, {"id": "r2"}]}


@pytest.mark.asyncio
async def test_list_reflections_all(mock_client) -> None:
"""Omitting path queries all reflections without a WHERE clause."""
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
result = await list_reflections(mock_client)
mock_rq.assert_called_once_with(mock_client, "SELECT * FROM sys.project.reflections")
assert result["rowCount"] == 2


@pytest.mark.asyncio
async def test_list_reflections_for_dataset(mock_client) -> None:
"""Providing a path filters by dataset_id."""
mock_client.get_catalog_by_path = AsyncMock(return_value={"id": "ds-123"})
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
result = await list_reflections(mock_client, path="space.my_table")
mock_rq.assert_called_once_with(mock_client, "SELECT * FROM sys.project.reflections WHERE dataset_id = 'ds-123'")
assert result["rowCount"] == 2


@pytest.mark.asyncio
async def test_list_reflections_with_limit(mock_client) -> None:
"""--limit appends a SQL LIMIT clause."""
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
await list_reflections(mock_client, limit=50)
mock_rq.assert_called_once_with(mock_client, "SELECT * FROM sys.project.reflections LIMIT 50")


@pytest.mark.asyncio
async def test_list_reflections_dataset_with_limit(mock_client) -> None:
"""Both path and limit combine WHERE and LIMIT."""
mock_client.get_catalog_by_path = AsyncMock(return_value={"id": "ds-456"})
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
await list_reflections(mock_client, path="space.ds", limit=10)
mock_rq.assert_called_once_with(
mock_client, "SELECT * FROM sys.project.reflections WHERE dataset_id = 'ds-456' LIMIT 10"
)


@pytest.mark.asyncio
async def test_list_reflections_filter_by_type(mock_client) -> None:
"""--type filters by reflection type."""
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
await list_reflections(mock_client, rtype="raw")
mock_rq.assert_called_once_with(mock_client, "SELECT * FROM sys.project.reflections WHERE type = 'RAW'")


@pytest.mark.asyncio
async def test_list_reflections_filter_by_status(mock_client) -> None:
"""--status filters by reflection status."""
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
await list_reflections(mock_client, status="failed")
mock_rq.assert_called_once_with(mock_client, "SELECT * FROM sys.project.reflections WHERE status = 'FAILED'")


@pytest.mark.asyncio
async def test_list_reflections_filter_by_dataset_name(mock_client) -> None:
"""--dataset-name filters with ILIKE substring match."""
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
await list_reflections(mock_client, dataset_name="orders")
mock_rq.assert_called_once_with(
mock_client, "SELECT * FROM sys.project.reflections WHERE dataset_name ILIKE '%orders%'"
)


@pytest.mark.asyncio
async def test_list_reflections_combined_filters(mock_client) -> None:
"""Multiple filters combine with AND."""
mock_client.get_catalog_by_path = AsyncMock(return_value={"id": "ds-789"})
with patch("drs.commands.reflection.run_query", new_callable=AsyncMock, return_value=QUERY_RESULT) as mock_rq:
await list_reflections(mock_client, path="space.ds", rtype="raw", status="can_accelerate", limit=5)
mock_rq.assert_called_once_with(
mock_client,
"SELECT * FROM sys.project.reflections"
" WHERE dataset_id = 'ds-789' AND type = 'RAW' AND status = 'CAN_ACCELERATE' LIMIT 5",
)


@pytest.mark.asyncio
Expand Down
Loading