Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
512709e
add verif to develop
rolfhm Jan 27, 2026
ff2a8a8
Merge branch 'develop' of https://github.com/ecmwf/WeatherGenerator i…
rolfhm Jan 30, 2026
f5cad2c
Merge branch 'develop' into add-support-for-met-norway-verification-o…
rolfhm Feb 5, 2026
e7fb3ee
update to respect core developments and default units
rolfhm Feb 9, 2026
4dca441
initial reworking
enssow Feb 17, 2026
ae12b08
Merge remote-tracking branch 'ecmwf/develop' into sorcha/dev/1880-ver…
enssow Feb 17, 2026
786bc56
working - TODO: delte uncesseary
enssow Feb 19, 2026
19da58c
fixes to windpseed
enssow Feb 19, 2026
34bd403
adjusting template
enssow Feb 19, 2026
ba9ca9f
adding multi stream handling
enssow Feb 20, 2026
b88b037
first commit
enssow Feb 23, 2026
703b723
removing verif as a package
enssow Feb 23, 2026
0aa09b3
Merge remote-tracking branch 'ecmwf/develop' into sorcha/dev/1880-ver…
enssow Feb 23, 2026
f3c3a9a
fixing pyporject
enssow Feb 23, 2026
f4e2484
Merge remote-tracking branch 'ecmwf/develop' into sorcha/dev/1880-ver…
enssow Feb 27, 2026
c040701
pinning eathkit
enssow Feb 27, 2026
991c5d2
add compat/join arguments
enssow Feb 27, 2026
8bebaa1
adjjsting to save all samples
enssow Feb 27, 2026
6a48b8a
linting
enssow Feb 27, 2026
63353c3
Merge remote-tracking branch 'ecmwf/develop' into sorcha/dev/1880-ver…
enssow Mar 3, 2026
a80892c
fxing attributes
enssow Mar 9, 2026
5770fb2
Merge remote-tracking branch 'ecmwf/develop' into sorcha/dev/1880-ver…
enssow Mar 9, 2026
90719b5
setting at least 1d for single fsteps
enssow Mar 12, 2026
ab6df3a
fixing duplication of location
enssow Mar 13, 2026
ea06e9e
Merge remote-tracking branch 'ecmwf/develop' into sorcha/dev/1880-ver…
enssow Mar 13, 2026
ed02cb4
change to match rest of export package
enssow Mar 13, 2026
2df2828
linting
enssow Mar 13, 2026
bc3a257
Merge pull request #1 from enssow/sorcha/dev/1880-verifparser
rolfhm Mar 13, 2026
518d716
declare some variables
rolfhm Mar 13, 2026
2e2e0f1
ruff did not like
rolfhm Mar 13, 2026
f21dc0b
declare self.channels correctly
rolfhm Mar 13, 2026
d79d6e0
Merge branch 'develop' into add-support-for-met-norway-verification-o…
rolfhm Mar 17, 2026
5987814
update some code missed in merge
rolfhm Mar 17, 2026
2ccc608
Merge branch 'develop' into add-support-for-met-norway-verification-o…
rolfhm Mar 18, 2026
30de27f
revert to earthkit-data 0.17.0
rolfhm Mar 19, 2026
04483bd
Merge branch 'develop' into add-support-for-met-norway-verification-o…
rolfhm Mar 26, 2026
c89c58d
_interval_start andddddddddddddd end
enssow Apr 9, 2026
f4934e1
linting
enssow Apr 9, 2026
b60d268
Merge pull request #2 from enssow/sorcha/dev/verif-debug
rolfhm Apr 10, 2026
525128e
Merge branch 'develop' into add-support-for-met-norway-verification-o…
enssow Apr 17, 2026
7b4f87a
Apply suggestion from @enssow check consistent grid
rolfhm Apr 17, 2026
c7a58d3
linting
rolfhm Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions config/evaluate/config_zarr2verif.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# List of variables to compare between WeatherGenerator output and MetNor observation files
# To add more varaibles use the same formate for the variable using the `var` field to put the
# name of the variable in the WeatherGenerator dataset and units for each stream in `wg_uni`
# Additionally, add the chosen variable to required channels in `verif_parser.py` L92

variables:
2t:
var: 2t
long: 2 meter temperature
wg_unit: {CERRA: K,
MEPS: K,
NORA3: K,
ERA5: K,
DEFAULT: K}
verif_unit: K
obs_name: air_temperature
obs_units: K
level_type: sfc

sp:
var: sp
long: Surface pressure
wg_unit: {CERRA: Pa,
MEPS: Pa,
NORA3: Pa,
ERA5: Pa,
DEFAULT: Pa}
verif_unit: Pa
obs_name: surface_air_pressure
obs_units: Pa
level_type: sfc

tp:
var: tp
long: Total precipitation amount
wg_unit: {CERRA: kg/m^2,
MEPS: kg/m^2,
NORA3: kg/m^2,
ERA5: m,
DEFAULT: kg/m^2}
verif_unit: kg/m^2
obs_name: precipitation_amount_1h
obs_units: kg/m^2
level_type: sfc


msl:
var: mslp
long: Mean sea level pressure
wg_unit: {CERRA: Pa,
MEPS: Pa,
NORA3: Pa,
ERA5: Pa,
DEFAULT: Pa}
verif_unit: Pa
obs_name: surface_air_pressure #check with Rolf
obs_units: kg/m^2
level_type: sfc


10si:
var: 10si # derived channel
long: wind speed
wg_unit: {CERRA: m/s,
MEPS: m/s,
NORA3: m/s,
ERA5: m/s,
DEFAULT: m/s}
verif_unit: m/s
obs_name: wind_speed
obs_units: m/s
level_type: sfc


coordinates:
sfc:
lat: latitude
lon: longitude
forecast_step: leadtime
forecast_reference_time: time
ncells: ncells
pl:
#not needed
pressure_level: pressure
lat: latitude
lon: longitude
forecast_step: leadtime
forecast_reference_time: time
ncells: ncells

dimensions:
lat:
verif: latitude
std: latitude
verif_unit: degrees_north
lon:
verif: longitude
std: longitude
verif_unit: degrees_east
pressure_level:
verif: pressure
std: pressure
verif_unit: hPa
forecast_reference_time:
verif: time
std: forecast_reference_time
forecast_step:
verif: leadtime
std: forecast_period
long: time since forecast_reference_time
verif_unit: hour
ncells:
verif: ncells
std: ncells
2 changes: 1 addition & 1 deletion packages/evaluate/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies = [
"eckitlib==1.32.3.7",
"earthkit-data==0.17.0",
"earthkit-utils==0.1.2"
]
]

[dependency-groups]
dev = [
Expand Down
6 changes: 3 additions & 3 deletions packages/evaluate/src/weathergen/evaluate/export/cf_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ def __init__(self, config, **kwargs):
grid_type : str
Type of grid ('regular' or 'gaussian').
"""

for k, v in kwargs.items():
setattr(self, k, v)

self.config = config
self.file_extension = _get_file_extension(self.output_format)
self.fstep_hours = np.timedelta64(self.fstep_hours, "h")
Expand Down Expand Up @@ -96,10 +94,12 @@ def _get_file_extension(output_format: str) -> str:
"""
if output_format == "netcdf":
return "nc"
if output_format == "verif":
return "nc"
elif output_format == "quaver":
return "grib"
else:
raise ValueError(
f"Unsupported output format: {output_format},"
"supported formats are ['netcdf', 'DWD', 'quaver']"
"supported formats are ['netcdf', 'verif', 'quaver']"
)
200 changes: 112 additions & 88 deletions packages/evaluate/src/weathergen/evaluate/export/export_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def get_data_worker(args: tuple) -> tuple[int, int, xr.DataArray]:
coords_arr = np.asarray(ds_group["coords"]) # (npoints, 2)
times_arr = np.asarray(ds_group["times"]).astype("datetime64[ns]") # (npoints,)
channels = list(ds_group.attrs["channels"])
source_interval_start = np.asarray(ds_group.attrs["source_interval"]["start"]).astype(
"datetime64[ns]"
)
source_interval_end = np.asarray(ds_group.attrs["source_interval"]["end"]).astype(
"datetime64[ns]"
)

# Build a lightweight xarray DataArray with the same structure
# that process_sample / assign_coords expects:
Expand All @@ -75,6 +81,8 @@ def get_data_worker(args: tuple) -> tuple[int, int, xr.DataArray]:
"valid_time": ("ipoint", times_arr),
"lat": ("ipoint", coords_arr[:, 0]),
"lon": ("ipoint", coords_arr[:, 1]),
"source_interval_start": source_interval_start,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be computed from source, see: #2221

"source_interval_end": source_interval_end,
},
)

Expand Down Expand Up @@ -268,6 +276,13 @@ def get_ref_times(fname_zarr, stream, samples, fstep_hours, n_processes) -> list
return ref_times


def get_streams(stream, fname_zarr):
with zarrio_reader(fname_zarr) as zio:
zio_streams = zio.streams
streams = zio_streams if stream is None else [stream]
return streams


def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
"""
Retrieve data from Zarr store and export to the requested format.
Expand Down Expand Up @@ -303,96 +318,105 @@ def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
fname_zarr = get_model_results(run_id, epoch, rank)
fsteps = get_fsteps(fsteps, fname_zarr)
samples = get_samples(samples, fname_zarr)
grid_type = get_grid_type(data_type, stream, fname_zarr)
channels = get_channels(channels, stream, fname_zarr)
ref_times = get_ref_times(fname_zarr, stream, samples, fstep_hours, n_processes)

kwargs["grid_type"] = grid_type
kwargs["channels"] = channels
kwargs["data_type"] = data_type

parser = CfParserFactory.get_parser(config=config, **kwargs)

n_fsteps = len(fsteps)
total_tasks = len(samples) * n_fsteps
streams = get_streams(stream, fname_zarr)
for stream in streams:
grid_type = get_grid_type(data_type, stream, fname_zarr)
channels = get_channels(channels, stream, fname_zarr)
ref_times = get_ref_times(fname_zarr, stream, samples, fstep_hours, n_processes)
kwargs["grid_type"] = grid_type
kwargs["channels"] = channels
kwargs["data_type"] = data_type

parser = CfParserFactory.get_parser(config=config, **kwargs)

n_fsteps = len(fsteps)
total_tasks = len(samples) * n_fsteps

# Batch size in *samples*. Limits how many samples can be in-flight at once,
# bounding peak memory while still allowing read/write overlap within each batch.
batch_size = max(1, n_processes * 2)
n_batches = (len(samples) + batch_size - 1) // batch_size

_logger.info(
f"Exporting {len(samples)} samples × {n_fsteps} fsteps "
f"({total_tasks} total tasks) in {n_batches} batch(es) of up to "
f"{batch_size} samples, using {n_processes} workers. "
f"Reading and writing are interleaved within each batch."
)

# Batch size in *samples*. Limits how many samples can be in-flight at once,
# bounding peak memory while still allowing read/write overlap within each batch.
batch_size = max(1, n_processes * 2)
n_batches = (len(samples) + batch_size - 1) // batch_size
# Initialise each worker with the zarr path so it is resolved only once.
with Pool(
processes=n_processes,
initializer=_init_worker,
initargs=(fname_zarr,),
) as pool:
samples_written = 0

for batch_idx in range(n_batches):
batch_start = batch_idx * batch_size
batch_end = min(batch_start + batch_size, len(samples))
batch_samples = samples[batch_start:batch_end]
batch_ref_times = ref_times[batch_start:batch_end]

# Map sample -> index within this batch for ref_times lookup.
sample_to_batch_idx = {s: i for i, s in enumerate(batch_samples)}

batch_tasks = [
(sample, fstep, stream, data_type)
for sample in batch_samples
for fstep in fsteps
]

_logger.info(
f"Batch {batch_idx + 1}/{n_batches}: "
f"samples {batch_start}–{batch_end - 1} "
f"({len(batch_samples)} samples, {len(batch_tasks)} tasks)"
)

_logger.info(
f"Exporting {len(samples)} samples × {n_fsteps} fsteps "
f"({total_tasks} total tasks) in {n_batches} batch(es) of up to "
f"{batch_size} samples, using {n_processes} workers. "
f"Reading and writing are interleaved within each batch."
)
# Interleaved read/write: as soon as all fsteps for a sample
# arrive, write it immediately while workers continue reading.
sample_results: dict[int, list] = defaultdict(list)
batch_written = 0

# Initialise each worker with the zarr path so it is resolved only once.
with Pool(
processes=n_processes,
initializer=_init_worker,
initargs=(fname_zarr,),
) as pool:
samples_written = 0

for batch_idx in range(n_batches):
batch_start = batch_idx * batch_size
batch_end = min(batch_start + batch_size, len(samples))
batch_samples = samples[batch_start:batch_end]
batch_ref_times = ref_times[batch_start:batch_end]

# Map sample -> index within this batch for ref_times lookup.
sample_to_batch_idx = {s: i for i, s in enumerate(batch_samples)}

batch_tasks = [
(sample, fstep, stream, data_type) for sample in batch_samples for fstep in fsteps
]

_logger.info(
f"Batch {batch_idx + 1}/{n_batches}: "
f"samples {batch_start}–{batch_end - 1} "
f"({len(batch_samples)} samples, {len(batch_tasks)} tasks)"
)

# Interleaved read/write: as soon as all fsteps for a sample
# arrive, write it immediately while workers continue reading.
sample_results: dict[int, list] = defaultdict(list)
batch_written = 0

pbar = tqdm(
total=len(batch_tasks),
desc=f" Batch {batch_idx + 1}/{n_batches}",
)

for sample, _fstep, data in pool.imap_unordered(
get_data_worker, batch_tasks, chunksize=max(1, n_fsteps)
):
sample_results[sample].append(data)
pbar.update(1)

# Check if this sample is complete (all fsteps received).
if len(sample_results[sample]) == n_fsteps:
b_idx = sample_to_batch_idx[sample]
ref_time = batch_ref_times[b_idx]
results_iter = iter(sample_results[sample])
parser.process_sample(results_iter, ref_time=ref_time)

# Free memory immediately.
del sample_results[sample]
batch_written += 1

pbar.close()

samples_written += batch_written
if batch_written != len(batch_samples):
_logger.error(
f"Batch {batch_idx + 1}: expected {len(batch_samples)} "
f"samples but only wrote {batch_written}. "
f"Incomplete: {list(sample_results.keys())}"
pbar = tqdm(
total=len(batch_tasks),
desc=f" Batch {batch_idx + 1}/{n_batches}",
)

# Free any remaining refs before next batch.
del sample_results

_logger.info(f"Export complete. Wrote {samples_written}/{len(samples)} samples.")
processed_samples = []

for sample, _fstep, data in pool.imap_unordered(
get_data_worker, batch_tasks, chunksize=max(1, n_fsteps)
):
sample_results[sample].append(data)
pbar.update(1)

# Check if this sample is complete (all fsteps received).
if len(sample_results[sample]) == n_fsteps:
b_idx = sample_to_batch_idx[sample]
ref_time = batch_ref_times[b_idx]
results_iter = iter(sample_results[sample])
processed = parser.process_sample(results_iter, ref_time=ref_time)
processed_samples.append(processed)

# Free memory immediately.
del sample_results[sample]
batch_written += 1

# Only save here if need to merge samples, otherwise saved in process_sample
if processed_samples[0] is not None:
parser.save(processed_samples)
pbar.close()

samples_written += batch_written
if batch_written != len(batch_samples):
_logger.error(
f"Batch {batch_idx + 1}: expected {len(batch_samples)} "
f"samples but only wrote {batch_written}. "
f"Incomplete: {list(sample_results.keys())}"
)

# Free any remaining refs before next batch.
del sample_results

_logger.info(f"Export complete. Wrote {samples_written}/{len(samples)} samples.")
Loading
Loading