Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions src/nemosis/data_fetch_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def cache_compiler(
select_columns=None,
fformat="feather",
rebuild=False,
keep_csv=False,
keep_csv=True,
**kwargs,
):
"""
Expand Down Expand Up @@ -184,7 +184,7 @@ def cache_compiler(
rebuild (bool): If True then cache files are rebuilt
even if they exist already. False by default.
keep_csv (bool): If True raw CSVs from AEMO are not deleted after
the cache is built. False by default
the cache is built. True by default
**kwargs: additional arguments passed to the pd.to_{fformat}() function

Returns:
Expand All @@ -195,7 +195,10 @@ def cache_compiler(
raise UserInputError("The raw_data_location provided is None.")

if not _os.path.isdir(raw_data_location):
raise UserInputError("The raw_data_location provided does not exist.")
if _os.path.isfile(raw_data_location):
raise UserInputError(f"The raw_data_location {raw_data_location} provided exists as a file, not directory.")
else:
_os.makedirs(raw_data_location)

if table_name not in _defaults.dynamic_tables:
raise UserInputError("Table name provided is not a dynamic table.")
Expand Down Expand Up @@ -778,11 +781,17 @@ def _write_to_format(data, fformat, full_filename, write_kwargs):
if _os.path.isfile(full_filename) and fformat != "csv":
_os.unlink(full_filename)
# Write to required format
if fformat == "feather":
write_function[fformat](full_filename, **write_kwargs)
elif fformat == "parquet":
write_function[fformat](full_filename, index=False, **write_kwargs)
return
try:
if fformat == "feather":
write_function[fformat](full_filename, **write_kwargs)
elif fformat == "parquet":
write_function[fformat](full_filename, index=False, **write_kwargs)
return
except Exception:
# tidy up incomplete file
if _os.path.isfile(full_filename):
_os.unlink(full_filename)
raise


def _download_data(
Expand Down
129 changes: 126 additions & 3 deletions tests/end_to_end_table_tests/test_cache_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
that writes typed feather / parquet files to disk for downstream consumers.

Verifies (a) the round-trip through cache → dynamic_data_compiler preserves
typed columns and (b) `select_columns` narrows the cached file itself, not
just the returned frame.
typed columns, (b) `select_columns` narrows the cached file itself, not
just the returned frame, (c) cache directory is auto-created when missing,
(d) keep_csv=True is the default behaviour, and (e) partial cache files
are cleaned up when a feather/parquet write fails mid-flight.
"""
import pandas as pd
import pytest

from nemosis import cache_compiler, dynamic_data_compiler
from nemosis import cache_compiler, data_fetch_methods, dynamic_data_compiler
from nemosis.custom_errors import UserInputError


START = "2018/05/01 00:00:00"
Expand Down Expand Up @@ -57,3 +60,123 @@ def test_select_columns_narrows_cached_file(nemosis_fixture):
for path in parquet_files:
on_disk = pd.read_parquet(path)
assert set(on_disk.columns) == {"SETTLEMENTDATE", "REGIONID"}, path


def test_creates_cache_directory_when_missing(nemosis_fixture):
"""cache_compiler builds caches — making the user mkdir first is needless
friction. If the destination is missing, create it."""
target = nemosis_fixture / "new_subdir_that_does_not_exist"
assert not target.exists()

cache_compiler(
start_time=START, end_time=END,
table_name="DISPATCHPRICE",
raw_data_location=str(target),
)

assert target.is_dir()
assert list(target.glob("*.feather")), "cache should be populated"


def test_raises_when_cache_path_is_a_file(nemosis_fixture):
"""A path that points at a regular file is clearly a typo, not a
cache directory to create — surface that as UserInputError."""
not_a_dir = nemosis_fixture / "oops.txt"
not_a_dir.write_text("I am not a directory")

with pytest.raises(UserInputError, match="exists as a file"):
cache_compiler(
start_time=START, end_time=END,
table_name="DISPATCHPRICE",
raw_data_location=str(not_a_dir),
)


def test_keep_csv_true_by_default_keeps_fetched_csv(nemosis_fixture):
"""When cache_compiler actually has to fetch (no existing feather, so
the code path that downloads + extracts a CSV runs), the default
keep_csv=True must leave the extracted CSV on disk alongside the
feather. rebuild=True forces the fetch path so this test isn't
dependent on tmp_path being empty at start.

AEMO zips contain CSV files with an uppercase .CSV extension —
NEMOSIS handles this internally with [cC][sS][vV] globs and the
test does the same."""
cache_compiler(
start_time=START, end_time=END,
table_name="DISPATCHPRICE",
raw_data_location=str(nemosis_fixture),
rebuild=True,
# no keep_csv kwarg — exercises the default
)
csv_files = list(nemosis_fixture.glob("*DISPATCHPRICE*.[Cc][Ss][Vv]"))
assert csv_files, "default keep_csv=True should retain the fetched CSV"


def test_keep_csv_false_removes_fetched_csv(nemosis_fixture):
"""Mirror of the above with the override — verifies the opt-out
path still works (the source-side delete in _dynamic_data_fetch_loop
fires only when keep_csv is False)."""
cache_compiler(
start_time=START, end_time=END,
table_name="DISPATCHPRICE",
raw_data_location=str(nemosis_fixture),
rebuild=True,
keep_csv=False,
)
csv_files = list(nemosis_fixture.glob("*DISPATCHPRICE*.[Cc][Ss][Vv]"))
assert not csv_files, "keep_csv=False should remove the fetched CSV"


def test_existing_feather_means_no_csv_is_fetched(nemosis_fixture):
"""If the feather is already in the cache, cache_compiler must take
the "already compiled" short-circuit and not fetch a CSV — keep_csv
is only about retaining a CSV we actually downloaded, not about
creating one out of thin air. Pre-populate empty feather files at
the expected filenames (in caching_mode the existence check skips
the read), call cache_compiler without rebuild, and verify no CSV
appeared."""
# April + May because NEMOSIS uses a 1-day buffer-back, so a query
# starting 2018-05-01 also scans the 2018-04 archive.
for month in ("201804", "201805"):
(nemosis_fixture / f"PUBLIC_DVD_DISPATCHPRICE_{month}010000.feather").touch()

cache_compiler(
start_time=START, end_time=END,
table_name="DISPATCHPRICE",
raw_data_location=str(nemosis_fixture),
# default keep_csv=True — would matter if a CSV was fetched
)

csv_files = list(nemosis_fixture.glob("*DISPATCHPRICE*.[Cc][Ss][Vv]"))
assert not csv_files, (
"keep_csv=True should NOT cause a CSV to be created when the "
"feather already exists — the CSV branch must not run at all"
)


@pytest.mark.parametrize("fformat", ["feather", "parquet"])
def test_write_to_format_cleans_up_partial_file_on_failure(tmp_path, monkeypatch, fformat):
"""A mid-write failure (e.g. disk full) used to leave a partial,
unreadable feather/parquet on disk. Subsequent runs would then trip
on the corrupt file. _write_to_format now removes the partial file
in its except branch before re-raising the original exception."""
target = tmp_path / f"x.{fformat}"
df = pd.DataFrame({"a": [1, 2, 3]})

method = "to_feather" if fformat == "feather" else "to_parquet"

def fake_write(self, path, **kwargs):
# Simulate a partial write — bytes hit disk before the writer errors.
with open(path, "wb") as f:
f.write(b"partial-bytes-from-failed-write")
raise IOError("simulated disk full mid-write")

monkeypatch.setattr(pd.DataFrame, method, fake_write)

with pytest.raises(IOError, match="simulated disk full"):
data_fetch_methods._write_to_format(df, fformat, str(target), {})

assert not target.exists(), (
f"partial {fformat} file should have been cleaned up after the write failure"
)
Loading