From 8420fec715ba7508add1e2e3779cca805b542870 Mon Sep 17 00:00:00 2001 From: Matthew Davis Date: Mon, 25 May 2026 13:57:26 +1000 Subject: [PATCH 1/4] Stream zip downloads to disk and cache them between runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cherry-picked from #67's 2a5812f. Two related behaviours land here: 1. Streamed downloads (closes the foundation for #55-style cleanup on partial writes — wired up in a follow-up commit). A new `download_to_path(url, path)` helper opens the response with `stream=True` and writes chunks to disk via `iter_content`, so large zips never have to fit in memory. `download_to_dir(url, dir)` layers on top, deriving the filename from the URL. 2. Zip retention (#56). All zip downloads now land on disk and stay there. Previously the zip was read into an `io.BytesIO` and discarded after extraction; with the new design the zip persists, so a subsequent call against the same cache can skip the network and just re-extract from local. `download_to_path` is idempotent on the existence of the destination — if the zip is there, the network call is short-circuited. `download_unzip_csv`, the four `_download_and_unpack_*` functions, `download_csv`, `download_xl`, and `download_elements_file` are refactored to use the new helpers. Trimmed from PR #67's original cherry-pick: - The TTL-cached HTML parent-page pre-check (`_pre_check_file_is_missing`, `download_html`, `download_html_as_soup`, the cachetools dep) — separate feature, will be PR-C. - The PR #67 `download_xl → download_xml` rename — was already corrected to `download_xlsx` in #84. Behaviour change on `keep_csv=False`: the CSV is still removed as documented, but the .zip stays. That's the whole point of #56 (Matt's slow-internet use case). Users wanting a truly empty cache have no opt-out yet — `keep_zip` parameter is added in the next commit on this branch. The pre-existing `test_keep_csv_false_leaves_cache_empty` test was written before #56 landed and asserted today's "no zip on disk" contract; renamed to `test_keep_csv_false_removes_csv` and the assertion narrowed to "no CSV", consistent with the new default. Bugs in PR #67's original code are preserved as-authored for clarity of authorship; fixes follow in the next commit: - `download_to_dir`'s `force_redo` parameter is accepted but never forwarded to `download_to_path`. - `download_to_path` has a "temporary debugging" log line for `.zip` URLs. - `download_to_path` contains a per-request `aemo.com.au` UA override rendered redundant by PR #85's uniform Chrome 130 session default. - Several `except KeyboardInterrupt: raise` blocks were added in the run dispatchers; KeyboardInterrupt isn't a subclass of Exception in Py3 so these are no-ops. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/nemosis/downloader.py | 319 +++++++++++------- .../test_fformat_csv.py | 16 +- 2 files changed, 201 insertions(+), 134 deletions(-) diff --git a/src/nemosis/downloader.py b/src/nemosis/downloader.py index bfca69b..1634765 100644 --- a/src/nemosis/downloader.py +++ b/src/nemosis/downloader.py @@ -5,6 +5,7 @@ import zipfile import io import pandas as pd +from urllib.parse import urlparse from . import defaults, custom_errors @@ -37,9 +38,11 @@ def run(year, month, day, chunk, index, filename_stub, down_load_to): # Perform the download, unzipping saving of the file try: download_unzip_csv(url_formatted, down_load_to) - except Exception: + except KeyboardInterrupt: + raise + except Exception as e: if chunk == 1: - logger.warning(f"{filename_stub} not downloaded") + logger.warning(f"{filename_stub} not downloaded ({e})") def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to): @@ -54,8 +57,10 @@ def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to): _download_and_unpack_bid_move_complete_files( download_url, down_load_to ) - except Exception: - logger.warning(f"{filename_stub} not downloaded") + except KeyboardInterrupt: + raise + except Exception as e: + logger.warning(f"{filename_stub} not downloaded ({e})") def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, down_load_to): @@ -67,8 +72,10 @@ def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, do _download_and_unpack_next_region_tables( download_url, down_load_to ) - except Exception: - logger.warning(f"{filename_stub} not downloaded") + except KeyboardInterrupt: + raise + except Exception as e: + logger.warning(f"{filename_stub} not downloaded ({e})") def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down_load_to): @@ -78,8 +85,10 @@ def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down filename_stub, defaults.current_data_page_urls["NEXT_DAY_DISPATCHLOAD"]) _download_and_unpack_next_dispatch_load_files_complete_files(download_url, down_load_to) - except Exception: - logger.warning(f"{filename_stub} not downloaded") + except KeyboardInterrupt: + raise + except Exception as e: + logger.warning(f"{filename_stub} not downloaded ({e})") def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, down_load_to): @@ -88,8 +97,10 @@ def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, do filename_stub, defaults.current_data_page_urls["INTERMITTENT_GEN_SCADA"]) _download_and_unpack_intermittent_gen_scada_file(download_url, down_load_to) - except Exception: - logger.warning(f"{filename_stub} not downloaded") + except KeyboardInterrupt: + raise + except Exception as e: + logger.warning(f"{filename_stub} not downloaded ({e})") def _get_current_url(filename_stub, current_page_url): @@ -102,120 +113,116 @@ def _get_current_url(filename_stub, current_page_url): def _download_and_unpack_bid_move_complete_files( download_url, down_load_to ): - r = session.get(download_url) - r.raise_for_status() - zipped_file = zipfile.ZipFile(io.BytesIO(r.content)) - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 2 - ) - csv_file = zipped_file.open(file_name) - BIDDAYOFFER_D = pd.read_csv( - csv_file, header=1, nrows=start_row_second_table - 3, dtype=str - ) - BIDDAYOFFER_D.to_csv( - os.path.join( - down_load_to, - "PUBLIC_DVD_BIDDAYOFFER_D_" + file_name[24:32] + ".csv", - ), - index=False, - ) - csv_file = zipped_file.open(file_name) - BIDPEROFFER_D = pd.read_csv( - csv_file, header=start_row_second_table - 1, dtype=str - )[:-1] - BIDPEROFFER_D.to_csv( - os.path.join( - down_load_to, - "PUBLIC_DVD_BIDPEROFFER_D_" + file_name[24:32] + ".csv", - ), - index=False, - ) + zip_local_path = download_to_dir(download_url, down_load_to) + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 2 + ) + csv_file = zipped_file.open(file_name) + BIDDAYOFFER_D = pd.read_csv( + csv_file, header=1, nrows=start_row_second_table - 3, dtype=str + ) + BIDDAYOFFER_D.to_csv( + os.path.join( + down_load_to, + "PUBLIC_DVD_BIDDAYOFFER_D_" + file_name[24:32] + ".csv", + ), + index=False, + ) + csv_file = zipped_file.open(file_name) + BIDPEROFFER_D = pd.read_csv( + csv_file, header=start_row_second_table - 1, dtype=str + )[:-1] + BIDPEROFFER_D.to_csv( + os.path.join( + down_load_to, + "PUBLIC_DVD_BIDPEROFFER_D_" + file_name[24:32] + ".csv", + ), + index=False, + ) def _download_and_unpack_next_region_tables( download_url, down_load_to ): - r = session.get(download_url) - r.raise_for_status() - zipped_file = zipfile.ZipFile(io.BytesIO(r.content)) + zip_local_path = download_to_dir(download_url, down_load_to) + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 2 + ) + start_row_third_table = _find_start_row_nth_table( + zipped_file, file_name, 3 + ) + csv_file = zipped_file.open(file_name) + DAILY_REGION_SUMMARY = pd.read_csv( + csv_file, header=start_row_second_table - 1, + nrows=start_row_third_table - start_row_second_table - 1, dtype=str + ) + DAILY_REGION_SUMMARY.to_csv( + os.path.join( + down_load_to, + "PUBLIC_DAILY_REGION_SUMMARY_" + file_name[13:21] + ".csv", + ), + index=False, + ) - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 2 - ) - start_row_third_table = _find_start_row_nth_table( - zipped_file, file_name, 3 - ) - csv_file = zipped_file.open(file_name) - DAILY_REGION_SUMMARY = pd.read_csv( - csv_file, header=start_row_second_table - 1, - nrows=start_row_third_table - start_row_second_table - 1, dtype=str - ) - DAILY_REGION_SUMMARY.to_csv( - os.path.join( - down_load_to, - "PUBLIC_DAILY_REGION_SUMMARY_" + file_name[13:21] + ".csv", - ), - index=False, - ) - def _download_and_unpack_next_dispatch_load_files_complete_files( download_url, down_load_to ): - r = session.get(download_url) - r.raise_for_status() - zipped_file = zipfile.ZipFile(io.BytesIO(r.content)) - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 2 - ) - csv_file = zipped_file.open(file_name) - NEXT_DAY_DISPATCHLOAD = pd.read_csv( - csv_file, header=1, nrows=start_row_second_table - 3, dtype=str - ) - NEXT_DAY_DISPATCHLOAD.to_csv( - os.path.join( - down_load_to, - "PUBLIC_NEXT_DAY_DISPATCHLOAD_" + file_name[25:33] + ".csv", - ), - index=False, - ) + zip_local_path = download_to_dir(download_url, down_load_to) + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 2 + ) + csv_file = zipped_file.open(file_name) + NEXT_DAY_DISPATCHLOAD = pd.read_csv( + csv_file, header=1, nrows=start_row_second_table - 3, dtype=str + ) + NEXT_DAY_DISPATCHLOAD.to_csv( + os.path.join( + down_load_to, + "PUBLIC_NEXT_DAY_DISPATCHLOAD_" + file_name[25:33] + ".csv", + ), + index=False, + ) def _download_and_unpack_intermittent_gen_scada_file( download_url, down_load_to ): - r = session.get(download_url) - r.raise_for_status() - zipped_file = zipfile.ZipFile(io.BytesIO(r.content)) - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 1 - ) - csv_file = zipped_file.open(file_name) - data = pd.read_csv( - csv_file, header=1, dtype=str - )[:-1] - data.to_csv( - os.path.join( - down_load_to, - "PUBLIC_NEXT_DAY_INTERMITTENT_GEN_SCADA_" + file_name[39:47] + ".csv", - ), - index=False, - ) + zip_local_path = download_to_dir(download_url, down_load_to) + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 1 + ) + csv_file = zipped_file.open(file_name) + data = pd.read_csv( + csv_file, header=1, dtype=str + )[:-1] + data.to_csv( + os.path.join( + down_load_to, + "PUBLIC_NEXT_DAY_INTERMITTENT_GEN_SCADA_" + file_name[39:47] + ".csv", + ), + index=False, + ) def _find_start_row_nth_table(sub_folder_zipfile, file_name, n): @@ -247,27 +254,90 @@ def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to): # Perform the download, unzipping saving of the file try: download_unzip_csv(url_formatted_latest, down_load_to) + except KeyboardInterrupt: + raise except Exception: try: download_unzip_csv(url_formatted_hist, down_load_to) + except KeyboardInterrupt: + raise except Exception as e: # FCAS csvs are bundled in 30 minute bundles # Check if the csv exists before warning file_check = os.path.join(down_load_to, filename_stub + ".csv") if not os.path.isfile(file_check): - logger.warning(f"{filename_stub} not downloaded") + logger.warning(f"{filename_stub} not downloaded {(e)}") + +def download_to_dir(url, down_load_to, force_redo=False): + """ + This function downloads a file from a url to a folder. + It streams it, so that large files do not fill up memory. + An exception is thrown if the url does not exist. + """ + + url = url.replace('#', '%23') + filename = url.split('/')[-1].split('?')[0] + path = os.path.join(down_load_to, filename) + download_to_path(url, path) + return path +def download_to_path(url, path_and_name, force_redo=False): + """ + This function downloads a file from a url to a specific path. + It streams it, so that large files do not fill up memory. + An exception is thrown if the url does not exist. + """ + url = url.replace('#', '%23') + filename = url.split('/')[-1].split('?')[0] + if (not os.path.exists(path_and_name)) or force_redo: + + # Most headers should be set via the session + # This is just for per-request variation + headers = {} + + # For the few files that are hosted outside the usual dataset, + # AEMO discriminates based on user agent, blocking scrapers. + # So let's pretend we're a normal web browser. + # AEMO, if you see this and don't like it, + # then just move the participant registration spreadsheet + # into the usual MMS nemweb dataset, + # at https://nemweb.com.au/ instead of https://www.aemo.com.au + # ideally as the same CSV format everything else is. + # That would make life easier for both of us. + # https://github.com/UNSW-CEEM/NEMOSIS/issues/60 + domain = urlparse(url).netloc + if domain.endswith("aemo.com.au"): + chrome_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + headers["User-Agent"] = chrome_user_agent + + with session.get(url, stream=True, headers=headers) as response: + if response.status_code not in [200, 404]: + logger.debug(f"URL {url} gave status {response.status_code}: {response.text[:1000]}") + response.raise_for_status() + try: + with open(path_and_name, 'wb') as file: + for chunk in response.iter_content(chunk_size=2**13): + file.write(chunk) + except KeyboardInterrupt: + raise + except Exception as e: + logger.error(f"Failed to write file to {path_and_name}: {e}") + if os.path.exists(path_and_name): + os.remove(path_and_name) + raise + + # temporary debugging + if url.lower().endswith('.zip'): + logger.info(f"Zip downloaded to {path_and_name}") def download_unzip_csv(url, down_load_to): """ This function downloads a zipped csv using a url, extracts the csv and saves it a specified location """ - url = url.replace('#', '%23') - r = session.get(url) - r.raise_for_status() - z = zipfile.ZipFile(io.BytesIO(r.content)) - z.extractall(down_load_to) + zip_local_path = download_to_dir(url, down_load_to) + with zipfile.ZipFile(zip_local_path) as z: + z.extractall(down_load_to) def download_csv(url, path_and_name): @@ -275,10 +345,7 @@ def download_csv(url, path_and_name): This function downloads a zipped csv using a url, extracts the csv and saves it a specified location """ - r = session.get(url) - r.raise_for_status() - with open(path_and_name, "wb") as f: - f.write(r.content) + download_to_path(url, path_and_name) def download_elements_file(url, path_and_name): @@ -288,21 +355,16 @@ def download_elements_file(url, path_and_name): links = soup.find_all("a") last_file_name = links[-1].text link = url + last_file_name - r = session.get(link) - r.raise_for_status() - with open(path_and_name, "wb") as f: - f.write(r.content) + download_to_path(link, path_and_name) + def download_xl(url, path_and_name): """ This function downloads a zipped csv using a url, extracts the csv and saves it a specified location """ - r = session.get(url) - r.raise_for_status() - with open(path_and_name, "wb") as f: - f.write(r.content) + download_to_path(url, path_and_name) def format_aemo_url(url, year, month, filename_stub): @@ -314,7 +376,6 @@ def format_aemo_url(url, year, month, filename_stub): year = str(year) return url.format(year, year, month, filename_stub) - def status_code_return(url): r = session.get(url) return r.status_code @@ -328,4 +389,4 @@ def _get_matching_link(url, stub_link): for link in links: if stub_link in link: return link - logger.warning(f"{stub_link} not downloaded") + logger.warning(f"{stub_link} not downloaded because no match for {stub_link} was found on {url}") diff --git a/tests/end_to_end_table_tests/test_fformat_csv.py b/tests/end_to_end_table_tests/test_fformat_csv.py index d99d21a..3f0825a 100644 --- a/tests/end_to_end_table_tests/test_fformat_csv.py +++ b/tests/end_to_end_table_tests/test_fformat_csv.py @@ -28,11 +28,16 @@ def test_csv_fformat_round_trip(nemosis_fixture): assert not all(data.dtypes == "object") -def test_keep_csv_false_leaves_cache_empty(nemosis_fixture): +def test_keep_csv_false_removes_csv(nemosis_fixture): """With `fformat="csv"` and `keep_csv=False`, the raw CSV is fetched, - used to produce the return frame, and then deleted — the cache dir - should be empty afterwards. This is the opt-out path for users who - don't want NEMOSIS accumulating files on disk.""" + used to produce the return frame, and then deleted. + + Note: the downloaded .zip is retained by default (see #56 — Matt's + use case is slow internet, so caching the compressed archive + avoids re-downloading on subsequent runs). Users who want a truly + empty cache can pass keep_zip=False — covered by + test_keep_zip_false_removes_zip in tests/test_downloader.py. + """ dynamic_data_compiler( start_time="2018/05/01 00:00:00", end_time="2018/05/01 00:30:00", @@ -42,4 +47,5 @@ def test_keep_csv_false_leaves_cache_empty(nemosis_fixture): keep_csv=False, select_columns=["SETTLEMENTDATE", "REGIONID", "RRP"], ) - assert list(nemosis_fixture.iterdir()) == [] + csv_files = list(nemosis_fixture.glob("*.[Cc][Ss][Vv]")) + assert csv_files == [], f"keep_csv=False should remove the CSV; found: {csv_files}" From 5735f07212112989a300ed16a3f6ac437c9c009d Mon Sep 17 00:00:00 2001 From: nick-gorman Date: Mon, 25 May 2026 14:13:09 +1000 Subject: [PATCH 2/4] Tidy up the streaming download code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five small clean-ups to the helpers introduced in the previous commit, plus two pre-existing bare-except bugs they happened to expose. Want to flag these for @mdavis-xyz to confirm — the KeyboardInterrupt discussion in particular involves a deliberate-looking design call of his that I'm reading as a no-op in Py3 (see PR body for analysis). In downloader.py: 1. Fix `download_to_dir`'s `force_redo` parameter — it was accepted in the signature but never forwarded to `download_to_path`, so the kwarg silently did nothing. Now plumbed through. 2. Drop the per-request `aemo.com.au` Chrome 120 User-Agent override in `download_to_path`. The session-level Chrome 130 UA from PR #85 already covers all hosts uniformly; the per-domain split is redundant. `urlparse` import dropped along with it. 3. Drop the `if response.status_code not in [200, 404]: logger.debug(...)` line. `raise_for_status()` on the next line already raises on every non-2xx; the debug log was suppressing log noise for 404s (legitimate "this archive doesn't exist yet" cases) but at the cost of a noisy `response.text[:1000]` body slice on every other non-2xx — including 200s, which is wrong. 4. Drop the "# temporary debugging" `logger.info("Zip downloaded to ...")` line. Was marked as temporary in the original code. 5. Drop the inner `except KeyboardInterrupt: raise` in `download_to_path`'s write loop. Same reasoning as the 8 others I removed: KeyboardInterrupt is a sibling of Exception (not a subclass) in Py3, so `except Exception:` below it already lets KI propagate. The handler is structurally a no-op. In data_fetch_methods.py and custom_tables.py: 6. Two pre-existing bare `except:` clauses changed to `except Exception:`. These were the real cause of the KI-doesn't- propagate concern Matt was trying to address with his handlers in #67: the bare-except in `static_table`'s download dispatch would swallow KI and convert it to `NoDataToReturn`, regardless of what the inner functions did. Narrowing it to `except Exception:` lets KI propagate naturally — which makes the inner KI handlers genuinely unnecessary rather than just structurally no-op. The bare except in `custom_tables.py:performance_at_nodal_peak` is a different shape (catches into `x = 1` defensive code, not re-raise) and arguably a separate bug, but the same fix applies and it's a one-line change. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/nemosis/custom_tables.py | 2 +- src/nemosis/data_fetch_methods.py | 2 +- src/nemosis/downloader.py | 81 +++++++++---------------------- 3 files changed, 24 insertions(+), 61 deletions(-) diff --git a/src/nemosis/custom_tables.py b/src/nemosis/custom_tables.py index eaec94d..429a1c9 100644 --- a/src/nemosis/custom_tables.py +++ b/src/nemosis/custom_tables.py @@ -200,7 +200,7 @@ def performance_at_nodal_peak(capacity_and_scada_grouped): index_max = capacity_and_scada_grouped["TOTALDEMAND"].idxmax() try: output_at_peak = capacity_and_scada_grouped["SCADAVALUE"][index_max] - except: + except Exception: x = 1 if np.isnan(output_at_peak): output_at_peak = 0 diff --git a/src/nemosis/data_fetch_methods.py b/src/nemosis/data_fetch_methods.py index 461e953..b6864e6 100644 --- a/src/nemosis/data_fetch_methods.py +++ b/src/nemosis/data_fetch_methods.py @@ -298,7 +298,7 @@ def static_table( static_downloader_map[table_name]( _defaults.static_table_url[table_name], path_and_name ) - except: + except Exception: raise NoDataToReturn( ( f"Compiling data for table {table_name} failed. " diff --git a/src/nemosis/downloader.py b/src/nemosis/downloader.py index 1634765..8b423c0 100644 --- a/src/nemosis/downloader.py +++ b/src/nemosis/downloader.py @@ -5,7 +5,6 @@ import zipfile import io import pandas as pd -from urllib.parse import urlparse from . import defaults, custom_errors @@ -38,8 +37,6 @@ def run(year, month, day, chunk, index, filename_stub, down_load_to): # Perform the download, unzipping saving of the file try: download_unzip_csv(url_formatted, down_load_to) - except KeyboardInterrupt: - raise except Exception as e: if chunk == 1: logger.warning(f"{filename_stub} not downloaded ({e})") @@ -57,8 +54,6 @@ def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to): _download_and_unpack_bid_move_complete_files( download_url, down_load_to ) - except KeyboardInterrupt: - raise except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") @@ -72,8 +67,6 @@ def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, do _download_and_unpack_next_region_tables( download_url, down_load_to ) - except KeyboardInterrupt: - raise except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") @@ -85,8 +78,6 @@ def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down filename_stub, defaults.current_data_page_urls["NEXT_DAY_DISPATCHLOAD"]) _download_and_unpack_next_dispatch_load_files_complete_files(download_url, down_load_to) - except KeyboardInterrupt: - raise except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") @@ -97,8 +88,6 @@ def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, do filename_stub, defaults.current_data_page_urls["INTERMITTENT_GEN_SCADA"]) _download_and_unpack_intermittent_gen_scada_file(download_url, down_load_to) - except KeyboardInterrupt: - raise except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") @@ -254,13 +243,9 @@ def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to): # Perform the download, unzipping saving of the file try: download_unzip_csv(url_formatted_latest, down_load_to) - except KeyboardInterrupt: - raise except Exception: try: download_unzip_csv(url_formatted_hist, down_load_to) - except KeyboardInterrupt: - raise except Exception as e: # FCAS csvs are bundled in 30 minute bundles # Check if the csv exists before warning @@ -274,61 +259,39 @@ def download_to_dir(url, down_load_to, force_redo=False): It streams it, so that large files do not fill up memory. An exception is thrown if the url does not exist. """ - url = url.replace('#', '%23') filename = url.split('/')[-1].split('?')[0] path = os.path.join(down_load_to, filename) - download_to_path(url, path) + download_to_path(url, path, force_redo=force_redo) return path + def download_to_path(url, path_and_name, force_redo=False): """ This function downloads a file from a url to a specific path. It streams it, so that large files do not fill up memory. - An exception is thrown if the url does not exist. + + Idempotent — if the destination already exists, the function + returns without re-downloading unless `force_redo=True`. On a + write failure mid-stream, the partial output file is removed + before the exception propagates. """ url = url.replace('#', '%23') - filename = url.split('/')[-1].split('?')[0] - if (not os.path.exists(path_and_name)) or force_redo: - - # Most headers should be set via the session - # This is just for per-request variation - headers = {} - - # For the few files that are hosted outside the usual dataset, - # AEMO discriminates based on user agent, blocking scrapers. - # So let's pretend we're a normal web browser. - # AEMO, if you see this and don't like it, - # then just move the participant registration spreadsheet - # into the usual MMS nemweb dataset, - # at https://nemweb.com.au/ instead of https://www.aemo.com.au - # ideally as the same CSV format everything else is. - # That would make life easier for both of us. - # https://github.com/UNSW-CEEM/NEMOSIS/issues/60 - domain = urlparse(url).netloc - if domain.endswith("aemo.com.au"): - chrome_user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" - headers["User-Agent"] = chrome_user_agent - - with session.get(url, stream=True, headers=headers) as response: - if response.status_code not in [200, 404]: - logger.debug(f"URL {url} gave status {response.status_code}: {response.text[:1000]}") - response.raise_for_status() - try: - with open(path_and_name, 'wb') as file: - for chunk in response.iter_content(chunk_size=2**13): - file.write(chunk) - except KeyboardInterrupt: - raise - except Exception as e: - logger.error(f"Failed to write file to {path_and_name}: {e}") - if os.path.exists(path_and_name): - os.remove(path_and_name) - raise - - # temporary debugging - if url.lower().endswith('.zip'): - logger.info(f"Zip downloaded to {path_and_name}") + if os.path.isfile(path_and_name) and not force_redo: + return + + with session.get(url, stream=True) as response: + response.raise_for_status() + try: + with open(path_and_name, 'wb') as file: + for chunk in response.iter_content(chunk_size=2**13): + file.write(chunk) + except Exception as e: + logger.error(f"Failed to write file to {path_and_name}: {e}") + if os.path.isfile(path_and_name): + os.unlink(path_and_name) + raise + def download_unzip_csv(url, down_load_to): """ From 980eed61474b160b24be1915df4fd358a734b4ca Mon Sep 17 00:00:00 2001 From: nick-gorman Date: Mon, 25 May 2026 14:32:48 +1000 Subject: [PATCH 3/4] Add keep_zip parameter and flip both raw-retention defaults to False MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related changes that finalise the streaming work: 1. Plumb a new `keep_zip` parameter through the call stack as the opt-in for #56's slow-internet zip caching. False by default. 2. Flip `keep_csv` default from True (the value PR #83 had landed) to False, so both raw-retention flags are independent opt-ins and the default state is the lean cache (only the typed feather/parquet remains, which is the "real" cache anyway). Behaviour matrix: keep_csv keep_zip what's left in the cache -------- -------- ----------------------------------- F (deflt) F (deflt) feather/parquet only ← lean F T feather/parquet + zip ← #56's slow-internet caching T F feather/parquet + CSV ← downstream CSV consumer T T feather/parquet + CSV + zip ← full raw retention ## Plumbing `download_to_path` now returns True if a fresh network fetch happened, False if the destination already existed and was re-used. `download_to_dir` returns `(path, downloaded)`. This signal is what keeps `keep_zip=False` safe for concurrent same-cache use: cleanup only fires for zips THIS call actually wrote — pre-existing zips (from a previous call, or another concurrent process) are reported as `downloaded=False` and left untouched. Each leaf function (`download_unzip_csv` and the four `_download_and_unpack_*`) takes `keep_zip` and uses the `downloaded` flag from `download_to_dir` to decide whether its own zip should be cleaned up after extract. `keep_zip` is plumbed from `dynamic_data_compiler` / `cache_compiler` through `_dynamic_data_fetch_loop` → `_download_data` → the per-table dispatcher functions (`run`, `run_bid_tables`, …). ## Default flip The `keep_csv=True` default that PR #83 introduced is reverted here. That PR's rationale ("keep raw CSV so downstream consumers can re-read without re-fetching") still applies — but now the user can opt in explicitly via `keep_csv=True` instead of getting it implicitly. The default contract becomes: NEMOSIS only retains the artifact it was specifically asked for (the typed feather/parquet); raw AEMO files are transient unless the user explicitly asks for them. ## Test changes - `test_keep_csv_true_by_default_keeps_fetched_csv` renamed to `test_keep_csv_true_retains_csv` and switched to explicit `keep_csv=True` — covers the opt-in path. - New `test_keep_csv_default_is_false` covers the new default behaviour. - New `test_keep_zip_default_is_false` and `test_keep_zip_true_retains_zip` cover the two `keep_zip` paths. - New `test_cached_zip_extracts_without_network` proves the #56 benefit end-to-end: with `keep_zip=True`, a second call that's missing the feather can rebuild from the cached zip without any network — verified by monkeypatching `aemo_mms_url` to a dead address mid-test. - New unit tests in `tests/test_downloader.py` for the helper-level contracts: `download_to_path` returns True on fresh fetch / False on cache hit / True with `force_redo=True`, and `download_to_dir` forwards `force_redo` (regression test for the PR #67 bug where the kwarg was accepted but never forwarded). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/nemosis/data_fetch_methods.py | 37 ++- src/nemosis/downloader.py | 296 ++++++++++-------- .../test_cache_compiler.py | 103 +++++- tests/test_downloader.py | 74 +++++ 4 files changed, 356 insertions(+), 154 deletions(-) diff --git a/src/nemosis/data_fetch_methods.py b/src/nemosis/data_fetch_methods.py index b6864e6..96c5d57 100644 --- a/src/nemosis/data_fetch_methods.py +++ b/src/nemosis/data_fetch_methods.py @@ -24,7 +24,8 @@ def dynamic_data_compiler( filter_cols=None, filter_values=None, fformat="feather", - keep_csv=True, + keep_csv=False, + keep_zip=False, parse_data_types=True, rebuild=False, **kwargs, @@ -51,7 +52,14 @@ def dynamic_data_compiler( Stored parquet and feather files will store columns as object type (compatbile with GUI use). For type inference for a cache, use cache_compiler. - keep_csv (bool): retains CSVs in cache. + keep_csv (bool): If True, raw CSVs from AEMO are retained in the + cache directory after the typed feather/parquet + is written. False by default — lean cache. + keep_zip (bool): If True, downloaded AEMO archive zips are + retained in the cache directory after extraction. + False by default. Set True on slow connections + to avoid re-downloading on subsequent runs + (see #56). data_merge (bool): concatenate DataFrames and return one DataFrame. If False, will not return any data. parse_data_types (bool): infers data types of columns when reading @@ -112,6 +120,7 @@ def dynamic_data_compiler( date_filter, fformat=fformat, keep_csv=keep_csv, + keep_zip=keep_zip, rebuild=rebuild, write_kwargs=kwargs, ) @@ -156,7 +165,8 @@ def cache_compiler( select_columns=None, fformat="feather", rebuild=False, - keep_csv=True, + keep_csv=False, + keep_zip=False, **kwargs, ): """ @@ -183,8 +193,14 @@ def cache_compiler( type inference for a cache, use cache_compiler. rebuild (bool): If True then cache files are rebuilt even if they exist already. False by default. - keep_csv (bool): If True raw CSVs from AEMO are not deleted after - the cache is built. True by default + keep_csv (bool): If True, raw CSVs from AEMO are retained + alongside the typed feather/parquet after the + cache is built. False by default — lean cache. + keep_zip (bool): If True, downloaded AEMO archive zips are + retained in the cache directory after + extraction. False by default. Set True on + slow connections to avoid re-downloading on + subsequent runs (see #56). **kwargs: additional arguments passed to the pd.to_{fformat}() function Returns: @@ -237,6 +253,7 @@ def cache_compiler( date_filter=None, fformat=fformat, keep_csv=keep_csv, + keep_zip=keep_zip, caching_mode=True, rebuild=rebuild, write_kwargs=kwargs, @@ -544,7 +561,8 @@ def _dynamic_data_fetch_loop( select_columns, date_filter, fformat="feather", - keep_csv=True, + keep_csv=False, + keep_zip=False, caching_mode=False, rebuild=False, write_kwargs={}, @@ -595,6 +613,7 @@ def _dynamic_data_fetch_loop( chunk, index, raw_data_location, + keep_zip=keep_zip, ) if _glob.glob(full_filename) and fformat != "csv" and not rebuild: @@ -795,7 +814,8 @@ def _write_to_format(data, fformat, full_filename, write_kwargs): def _download_data( - table_name, table_type, filename_stub, day, month, year, chunk, index, raw_data_location + table_name, table_type, filename_stub, day, month, year, chunk, index, raw_data_location, + keep_zip=False, ): """ Dispatch table to downloader to be downloaded. @@ -820,7 +840,8 @@ def _download_data( ) _processing_info_maps.downloader[table_type]( - year, month, day, chunk, index, filename_stub, raw_data_location + year, month, day, chunk, index, filename_stub, raw_data_location, + keep_zip=keep_zip, ) return diff --git a/src/nemosis/downloader.py b/src/nemosis/downloader.py index 8b423c0..93f712c 100644 --- a/src/nemosis/downloader.py +++ b/src/nemosis/downloader.py @@ -27,7 +27,7 @@ }) -def run(year, month, day, chunk, index, filename_stub, down_load_to): +def run(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=False): """This function""" url = defaults.aemo_mms_url @@ -36,15 +36,15 @@ def run(year, month, day, chunk, index, filename_stub, down_load_to): # Perform the download, unzipping saving of the file try: - download_unzip_csv(url_formatted, down_load_to) + download_unzip_csv(url_formatted, down_load_to, keep_zip=keep_zip) except Exception as e: if chunk == 1: logger.warning(f"{filename_stub} not downloaded ({e})") -def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to): +def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=False): if day is None: - run(year, month, day, chunk, index, filename_stub, down_load_to) + run(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=keep_zip) else: try: filename_stub = "BIDMOVE_COMPLETE_{year}{month}{day}".format(year=year, month=month, day=day) @@ -52,42 +52,42 @@ def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to): filename_stub, defaults.current_data_page_urls["BIDDING"]) _download_and_unpack_bid_move_complete_files( - download_url, down_load_to + download_url, down_load_to, keep_zip=keep_zip ) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") -def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, down_load_to): +def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=False): try: filename_stub = "PUBLIC_DAILY_{year}{month}{day}".format(year=year, month=month, day=day) download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["DAILY_REGION_SUMMARY"]) _download_and_unpack_next_region_tables( - download_url, down_load_to + download_url, down_load_to, keep_zip=keep_zip ) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") -def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down_load_to): +def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=False): try: filename_stub = "PUBLIC_NEXT_DAY_DISPATCH_{year}{month}{day}".format(year=year, month=month, day=day) download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["NEXT_DAY_DISPATCHLOAD"]) - _download_and_unpack_next_dispatch_load_files_complete_files(download_url, down_load_to) + _download_and_unpack_next_dispatch_load_files_complete_files(download_url, down_load_to, keep_zip=keep_zip) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") -def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, down_load_to): +def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=False): try: download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["INTERMITTENT_GEN_SCADA"]) - _download_and_unpack_intermittent_gen_scada_file(download_url, down_load_to) + _download_and_unpack_intermittent_gen_scada_file(download_url, down_load_to, keep_zip=keep_zip) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") @@ -100,118 +100,134 @@ def _get_current_url(filename_stub, current_page_url): def _download_and_unpack_bid_move_complete_files( - download_url, down_load_to + download_url, down_load_to, keep_zip=False ): - zip_local_path = download_to_dir(download_url, down_load_to) - with zipfile.ZipFile(zip_local_path) as zipped_file: - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 2 - ) - csv_file = zipped_file.open(file_name) - BIDDAYOFFER_D = pd.read_csv( - csv_file, header=1, nrows=start_row_second_table - 3, dtype=str - ) - BIDDAYOFFER_D.to_csv( - os.path.join( - down_load_to, - "PUBLIC_DVD_BIDDAYOFFER_D_" + file_name[24:32] + ".csv", - ), - index=False, - ) - csv_file = zipped_file.open(file_name) - BIDPEROFFER_D = pd.read_csv( - csv_file, header=start_row_second_table - 1, dtype=str - )[:-1] - BIDPEROFFER_D.to_csv( - os.path.join( - down_load_to, - "PUBLIC_DVD_BIDPEROFFER_D_" + file_name[24:32] + ".csv", - ), - index=False, - ) + zip_local_path, downloaded = download_to_dir(download_url, down_load_to) + try: + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 2 + ) + csv_file = zipped_file.open(file_name) + BIDDAYOFFER_D = pd.read_csv( + csv_file, header=1, nrows=start_row_second_table - 3, dtype=str + ) + BIDDAYOFFER_D.to_csv( + os.path.join( + down_load_to, + "PUBLIC_DVD_BIDDAYOFFER_D_" + file_name[24:32] + ".csv", + ), + index=False, + ) + csv_file = zipped_file.open(file_name) + BIDPEROFFER_D = pd.read_csv( + csv_file, header=start_row_second_table - 1, dtype=str + )[:-1] + BIDPEROFFER_D.to_csv( + os.path.join( + down_load_to, + "PUBLIC_DVD_BIDPEROFFER_D_" + file_name[24:32] + ".csv", + ), + index=False, + ) + finally: + if downloaded and not keep_zip and os.path.isfile(zip_local_path): + os.unlink(zip_local_path) def _download_and_unpack_next_region_tables( - download_url, down_load_to + download_url, down_load_to, keep_zip=False ): - zip_local_path = download_to_dir(download_url, down_load_to) - with zipfile.ZipFile(zip_local_path) as zipped_file: - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 2 - ) - start_row_third_table = _find_start_row_nth_table( - zipped_file, file_name, 3 - ) - csv_file = zipped_file.open(file_name) - DAILY_REGION_SUMMARY = pd.read_csv( - csv_file, header=start_row_second_table - 1, - nrows=start_row_third_table - start_row_second_table - 1, dtype=str - ) - DAILY_REGION_SUMMARY.to_csv( - os.path.join( - down_load_to, - "PUBLIC_DAILY_REGION_SUMMARY_" + file_name[13:21] + ".csv", - ), - index=False, - ) + zip_local_path, downloaded = download_to_dir(download_url, down_load_to) + try: + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 2 + ) + start_row_third_table = _find_start_row_nth_table( + zipped_file, file_name, 3 + ) + csv_file = zipped_file.open(file_name) + DAILY_REGION_SUMMARY = pd.read_csv( + csv_file, header=start_row_second_table - 1, + nrows=start_row_third_table - start_row_second_table - 1, dtype=str + ) + DAILY_REGION_SUMMARY.to_csv( + os.path.join( + down_load_to, + "PUBLIC_DAILY_REGION_SUMMARY_" + file_name[13:21] + ".csv", + ), + index=False, + ) + finally: + if downloaded and not keep_zip and os.path.isfile(zip_local_path): + os.unlink(zip_local_path) def _download_and_unpack_next_dispatch_load_files_complete_files( - download_url, down_load_to + download_url, down_load_to, keep_zip=False ): - zip_local_path = download_to_dir(download_url, down_load_to) - with zipfile.ZipFile(zip_local_path) as zipped_file: - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 2 - ) - csv_file = zipped_file.open(file_name) - NEXT_DAY_DISPATCHLOAD = pd.read_csv( - csv_file, header=1, nrows=start_row_second_table - 3, dtype=str - ) - NEXT_DAY_DISPATCHLOAD.to_csv( - os.path.join( - down_load_to, - "PUBLIC_NEXT_DAY_DISPATCHLOAD_" + file_name[25:33] + ".csv", - ), - index=False, - ) + zip_local_path, downloaded = download_to_dir(download_url, down_load_to) + try: + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 2 + ) + csv_file = zipped_file.open(file_name) + NEXT_DAY_DISPATCHLOAD = pd.read_csv( + csv_file, header=1, nrows=start_row_second_table - 3, dtype=str + ) + NEXT_DAY_DISPATCHLOAD.to_csv( + os.path.join( + down_load_to, + "PUBLIC_NEXT_DAY_DISPATCHLOAD_" + file_name[25:33] + ".csv", + ), + index=False, + ) + finally: + if downloaded and not keep_zip and os.path.isfile(zip_local_path): + os.unlink(zip_local_path) def _download_and_unpack_intermittent_gen_scada_file( - download_url, down_load_to + download_url, down_load_to, keep_zip=False ): - zip_local_path = download_to_dir(download_url, down_load_to) - with zipfile.ZipFile(zip_local_path) as zipped_file: - - file_name = zipped_file.namelist()[ - 0 - ] # Just one file so we can pull it out of the list using 0 - start_row_second_table = _find_start_row_nth_table( - zipped_file, file_name, 1 - ) - csv_file = zipped_file.open(file_name) - data = pd.read_csv( - csv_file, header=1, dtype=str - )[:-1] - data.to_csv( - os.path.join( - down_load_to, - "PUBLIC_NEXT_DAY_INTERMITTENT_GEN_SCADA_" + file_name[39:47] + ".csv", - ), - index=False, - ) + zip_local_path, downloaded = download_to_dir(download_url, down_load_to) + try: + with zipfile.ZipFile(zip_local_path) as zipped_file: + + file_name = zipped_file.namelist()[ + 0 + ] # Just one file so we can pull it out of the list using 0 + start_row_second_table = _find_start_row_nth_table( + zipped_file, file_name, 1 + ) + csv_file = zipped_file.open(file_name) + data = pd.read_csv( + csv_file, header=1, dtype=str + )[:-1] + data.to_csv( + os.path.join( + down_load_to, + "PUBLIC_NEXT_DAY_INTERMITTENT_GEN_SCADA_" + file_name[39:47] + ".csv", + ), + index=False, + ) + finally: + if downloaded and not keep_zip and os.path.isfile(zip_local_path): + os.unlink(zip_local_path) def _find_start_row_nth_table(sub_folder_zipfile, file_name, n): @@ -232,7 +248,7 @@ def _find_start_row_nth_table(sub_folder_zipfile, file_name, n): -def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to): +def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=False): """This function""" # Add the year and month information to the generic AEMO data url @@ -242,10 +258,10 @@ def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to): ) # Perform the download, unzipping saving of the file try: - download_unzip_csv(url_formatted_latest, down_load_to) + download_unzip_csv(url_formatted_latest, down_load_to, keep_zip=keep_zip) except Exception: try: - download_unzip_csv(url_formatted_hist, down_load_to) + download_unzip_csv(url_formatted_hist, down_load_to, keep_zip=keep_zip) except Exception as e: # FCAS csvs are bundled in 30 minute bundles # Check if the csv exists before warning @@ -255,30 +271,35 @@ def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to): def download_to_dir(url, down_load_to, force_redo=False): """ - This function downloads a file from a url to a folder. - It streams it, so that large files do not fill up memory. - An exception is thrown if the url does not exist. + Download a file into `down_load_to`, deriving the filename from + the URL. Returns a (path, downloaded) tuple: `path` is the file's + absolute path, `downloaded` is True if this call actually fetched + over the network and False if the file was already in place and + re-used. + + Streams the response so large files don't have to fit in memory. """ url = url.replace('#', '%23') filename = url.split('/')[-1].split('?')[0] path = os.path.join(down_load_to, filename) - download_to_path(url, path, force_redo=force_redo) - return path + downloaded = download_to_path(url, path, force_redo=force_redo) + return path, downloaded def download_to_path(url, path_and_name, force_redo=False): """ - This function downloads a file from a url to a specific path. - It streams it, so that large files do not fill up memory. + Download a file from `url` to `path_and_name`. Returns True if a + new network fetch occurred, False if the destination already + existed and was re-used. Idempotent — if the destination already exists, the function - returns without re-downloading unless `force_redo=True`. On a - write failure mid-stream, the partial output file is removed - before the exception propagates. + short-circuits unless `force_redo=True`. On a write failure + mid-stream, the partial output file is removed before the + exception propagates. """ url = url.replace('#', '%23') if os.path.isfile(path_and_name) and not force_redo: - return + return False with session.get(url, stream=True) as response: response.raise_for_status() @@ -291,16 +312,31 @@ def download_to_path(url, path_and_name, force_redo=False): if os.path.isfile(path_and_name): os.unlink(path_and_name) raise + return True -def download_unzip_csv(url, down_load_to): +def download_unzip_csv(url, down_load_to, keep_zip=False): """ - This function downloads a zipped csv using a url, - extracts the csv and saves it a specified location + Download a zipped csv from a URL, extract its contents into + `down_load_to`, and (per `keep_zip`) retain the zip on disk. + + `keep_zip=False` (default) cleans up the zip after extracting, + leaving only the extracted CSV in the cache directory. Cleanup + only touches zips this call actually downloaded — pre-existing + zips (from a previous call, or another concurrent process) are + left alone. + + `keep_zip=True` addresses #56: a cached zip on disk so subsequent + calls can re-extract without re-downloading (useful on slow + connections). """ - zip_local_path = download_to_dir(url, down_load_to) - with zipfile.ZipFile(zip_local_path) as z: - z.extractall(down_load_to) + zip_local_path, downloaded = download_to_dir(url, down_load_to) + try: + with zipfile.ZipFile(zip_local_path) as z: + z.extractall(down_load_to) + finally: + if downloaded and not keep_zip and os.path.isfile(zip_local_path): + os.unlink(zip_local_path) def download_csv(url, path_and_name): diff --git a/tests/end_to_end_table_tests/test_cache_compiler.py b/tests/end_to_end_table_tests/test_cache_compiler.py index 181ec54..2b7c1f7 100644 --- a/tests/end_to_end_table_tests/test_cache_compiler.py +++ b/tests/end_to_end_table_tests/test_cache_compiler.py @@ -4,13 +4,13 @@ Verifies (a) the round-trip through cache → dynamic_data_compiler preserves typed columns, (b) `select_columns` narrows the cached file itself, not just the returned frame, (c) cache directory is auto-created when missing, -(d) keep_csv=True is the default behaviour, and (e) partial cache files -are cleaned up when a feather/parquet write fails mid-flight. +(d) keep_csv / keep_zip defaults and opt-in paths, and (e) partial cache +files are cleaned up when a feather/parquet write fails mid-flight. """ import pandas as pd import pytest -from nemosis import cache_compiler, data_fetch_methods, dynamic_data_compiler +from nemosis import cache_compiler, data_fetch_methods, defaults, dynamic_data_compiler from nemosis.custom_errors import UserInputError @@ -92,12 +92,12 @@ def test_raises_when_cache_path_is_a_file(nemosis_fixture): ) -def test_keep_csv_true_by_default_keeps_fetched_csv(nemosis_fixture): +def test_keep_csv_true_retains_csv(nemosis_fixture): """When cache_compiler actually has to fetch (no existing feather, so - the code path that downloads + extracts a CSV runs), the default - keep_csv=True must leave the extracted CSV on disk alongside the - feather. rebuild=True forces the fetch path so this test isn't - dependent on tmp_path being empty at start. + the code path that downloads + extracts a CSV runs), `keep_csv=True` + must leave the extracted CSV on disk alongside the feather. + rebuild=True forces the fetch path so this test isn't dependent on + tmp_path being empty at start. AEMO zips contain CSV files with an uppercase .CSV extension — NEMOSIS handles this internally with [cC][sS][vV] globs and the @@ -107,25 +107,25 @@ def test_keep_csv_true_by_default_keeps_fetched_csv(nemosis_fixture): table_name="DISPATCHPRICE", raw_data_location=str(nemosis_fixture), rebuild=True, - # no keep_csv kwarg — exercises the default + keep_csv=True, ) csv_files = list(nemosis_fixture.glob("*DISPATCHPRICE*.[Cc][Ss][Vv]")) - assert csv_files, "default keep_csv=True should retain the fetched CSV" + assert csv_files, "keep_csv=True should retain the fetched CSV" -def test_keep_csv_false_removes_fetched_csv(nemosis_fixture): - """Mirror of the above with the override — verifies the opt-out - path still works (the source-side delete in _dynamic_data_fetch_loop - fires only when keep_csv is False).""" +def test_keep_csv_default_is_false(nemosis_fixture): + """Default behaviour — keep_csv=False removes the extracted CSV + after the typed feather is written, leaving only the feather in + the cache. Users who want raw retention opt in via keep_csv=True.""" cache_compiler( start_time=START, end_time=END, table_name="DISPATCHPRICE", raw_data_location=str(nemosis_fixture), rebuild=True, - keep_csv=False, + # no keep_csv kwarg — exercises the default ) csv_files = list(nemosis_fixture.glob("*DISPATCHPRICE*.[Cc][Ss][Vv]")) - assert not csv_files, "keep_csv=False should remove the fetched CSV" + assert not csv_files, f"default keep_csv=False should remove CSV; found: {csv_files}" def test_existing_feather_means_no_csv_is_fetched(nemosis_fixture): @@ -155,6 +155,77 @@ def test_existing_feather_means_no_csv_is_fetched(nemosis_fixture): ) +def test_keep_zip_default_is_false(nemosis_fixture): + """Default behaviour — keep_zip=False removes the downloaded zip + after extracting the CSV. Lean cache; only the typed feather (and + optional CSV if keep_csv=True) remains.""" + cache_compiler( + start_time=START, end_time=END, + table_name="DISPATCHPRICE", + raw_data_location=str(nemosis_fixture), + rebuild=True, + # no keep_zip kwarg — exercises the default + ) + zip_files = list(nemosis_fixture.glob("*.zip")) + assert not zip_files, f"default keep_zip=False should remove zips; found: {zip_files}" + + +def test_keep_zip_true_retains_zip(nemosis_fixture): + """keep_zip=True is the opt-in for #56's slow-internet use case — + the AEMO archive zip stays on disk after extraction so subsequent + runs can re-extract without re-downloading.""" + cache_compiler( + start_time=START, end_time=END, + table_name="DISPATCHPRICE", + raw_data_location=str(nemosis_fixture), + rebuild=True, + keep_zip=True, + ) + zip_files = list(nemosis_fixture.glob("*.zip")) + assert zip_files, "keep_zip=True should retain the downloaded zip" + + +def test_cached_zip_extracts_without_network(nemosis_fixture, monkeypatch): + """The #56 benefit in action: with keep_zip=True on a first call, + a subsequent call that needs the same CSV but finds the feather + missing should re-extract from the cached zip locally without + hitting nemweb. Proves it by breaking the AEMO URL after the first + call — if the second call tries to fetch, it would 404 against the + bad URL.""" + # First call — populate cache with zip retained + cache_compiler( + start_time=START, end_time=END, + table_name="DISPATCHPRICE", + raw_data_location=str(nemosis_fixture), + rebuild=True, + keep_zip=True, + ) + feather_files = list(nemosis_fixture.glob("*.feather")) + zip_files = list(nemosis_fixture.glob("*.zip")) + assert feather_files and zip_files + + # Delete the feather so the loop has to call _download_data again, + # but leave the zip in place so the lower-level download_to_path + # short-circuits on the cached file. + for f in feather_files: + f.unlink() + + # Point the URL at a dead address — any actual network call would fail. + monkeypatch.setattr(defaults, "aemo_mms_url", "http://127.0.0.1:1/dead/{}/{}/{}/{}.zip") + + # Should succeed using the cached zip, no network required. + cache_compiler( + start_time=START, end_time=END, + table_name="DISPATCHPRICE", + raw_data_location=str(nemosis_fixture), + rebuild=True, + keep_zip=True, + ) + + # Feather rebuilt from the cached zip + assert list(nemosis_fixture.glob("*.feather")), "cache should be rebuilt from cached zip" + + @pytest.mark.parametrize("fformat", ["feather", "parquet"]) def test_write_to_format_cleans_up_partial_file_on_failure(tmp_path, monkeypatch, fformat): """A mid-write failure (e.g. disk full) used to leave a partial, diff --git a/tests/test_downloader.py b/tests/test_downloader.py index 54b4466..5669e46 100644 --- a/tests/test_downloader.py +++ b/tests/test_downloader.py @@ -58,3 +58,77 @@ def test_download_xl_raises_on_404(aemo_mock_server): f"{aemo_mock_server}/does/not/exist.xlsx", f"{tmp}/out.xlsx", ) + + +# --------------------------------------------------------------------------- +# Streaming helpers: download_to_path / download_to_dir idempotency +# +# The (path, downloaded) return signal is what keeps keep_zip=False safe for +# concurrent same-cache use: cleanup only fires for zips THIS call actually +# wrote to disk; pre-existing zips from another process are reported as +# `downloaded=False` and left alone. +# --------------------------------------------------------------------------- + +def test_download_to_path_returns_true_on_first_call(aemo_mock_server, tmp_path): + target = tmp_path / "out.csv" + url = f"{aemo_mock_server}/-/media/files/electricity/nem/settlements_and_payments/settlements/auction-reports/archive/ancillary-services-market-causer-pays-variables-file.csv" + + downloaded = downloader.download_to_path(url, str(target)) + + assert downloaded is True + assert target.is_file() + assert target.stat().st_size > 0 + + +def test_download_to_path_returns_false_when_file_exists(aemo_mock_server, tmp_path): + """Idempotency signal: a second call with the destination already + present must report downloaded=False and skip the network. This is + the contract keep_zip=False relies on to avoid deleting another + process's pre-existing zip.""" + target = tmp_path / "out.csv" + target.write_bytes(b"pre-existing contents - pretend another process wrote this") + original_bytes = target.read_bytes() + + url = f"{aemo_mock_server}/-/media/files/electricity/nem/settlements_and_payments/settlements/auction-reports/archive/ancillary-services-market-causer-pays-variables-file.csv" + + downloaded = downloader.download_to_path(url, str(target)) + + assert downloaded is False + assert target.read_bytes() == original_bytes, "existing file must not be overwritten" + + +def test_download_to_path_force_redo_overrides_existing(aemo_mock_server, tmp_path): + """force_redo=True bypasses the idempotency check and re-downloads + even if the destination already exists. Useful when the user knows + the cached file is stale.""" + target = tmp_path / "out.csv" + target.write_bytes(b"stale contents") + + url = f"{aemo_mock_server}/-/media/files/electricity/nem/settlements_and_payments/settlements/auction-reports/archive/ancillary-services-market-causer-pays-variables-file.csv" + + downloaded = downloader.download_to_path(url, str(target), force_redo=True) + + assert downloaded is True + assert target.read_bytes() != b"stale contents", "force_redo=True should overwrite" + + +def test_download_to_dir_forwards_force_redo(aemo_mock_server, tmp_path): + """Regression test for the PR67 bug where download_to_dir accepted + force_redo but never forwarded it to download_to_path. With the bug, + force_redo=True would silently no-op.""" + url = f"{aemo_mock_server}/-/media/files/electricity/nem/settlements_and_payments/settlements/auction-reports/archive/ancillary-services-market-causer-pays-variables-file.csv" + + # First call establishes the file + path, downloaded_first = downloader.download_to_dir(url, str(tmp_path)) + assert downloaded_first is True + + # Second call without force_redo: should short-circuit + _, downloaded_second = downloader.download_to_dir(url, str(tmp_path)) + assert downloaded_second is False + + # Third call WITH force_redo: must re-download (bug regression check) + _, downloaded_third = downloader.download_to_dir(url, str(tmp_path), force_redo=True) + assert downloaded_third is True, ( + "force_redo=True should re-download; if this fails, download_to_dir " + "may have stopped forwarding force_redo to download_to_path" + ) From 08d1b6d8b61fcfe6807ba19667e2a8c7175e89e7 Mon Sep 17 00:00:00 2001 From: nick-gorman Date: Mon, 25 May 2026 14:45:10 +1000 Subject: [PATCH 4/4] Fix stale download_xl reference in 404 test PR #84 renamed download_xl to download_xlsx but this test was missed. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_downloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_downloader.py b/tests/test_downloader.py index 5669e46..8692989 100644 --- a/tests/test_downloader.py +++ b/tests/test_downloader.py @@ -51,10 +51,10 @@ def test_download_csv_raises_on_404(aemo_mock_server): ) -def test_download_xl_raises_on_404(aemo_mock_server): +def test_download_xlsx_raises_on_404(aemo_mock_server): with tempfile.TemporaryDirectory() as tmp: with pytest.raises(requests.HTTPError): - downloader.download_xl( + downloader.download_xlsx( f"{aemo_mock_server}/does/not/exist.xlsx", f"{tmp}/out.xlsx", )