Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions configs/test_emit_paths.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"experiment_id": "test_emit_paths",
"suffix_time": false,
"max_duration": 2,
"emit_paths": [
["listeners", "mass", "cell_mass"],
["global_time"]
],
"emitter": "parquet"
}
5 changes: 4 additions & 1 deletion doc/composites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ The analysis plots located in :py:mod:`~ecoli.analysis.single.blame` can be used
to visualize these updates.

.. warning::
This feature should only be turned for debugging purposes and
This feature should only be turned on for debugging purposes and
only when using the in-memory emitter (see :ref:`ram_emitter`).
The Parquet emitter is unable to serialize many of the objects
contained in process updates (e.g., nested lists of inconsistent
depth like ``[[1], 2]``).

-------------
Initial State
Expand Down
8 changes: 8 additions & 0 deletions doc/workflows.rst
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,14 @@ is a list workflow behaviors enabled in our model to handle unexpected errors.
depends on generation 6, :py:mod:`runscripts.create_variants` depends on
:py:mod:`runscripts.parca`, etc).

.. warning::
The resume option is primarily meant for testing code changes, not config changes.
As such, most edits to the configuration JSON are silently ignored when resuming a workflow.
The only exceptions are changes to resource allocation options (e.g., ``SIM_MEM``),
allowing users to retry failed jobs with higher resource limits without triggering
re-execution of already completed jobs. If you want to change other options, you
must launch a new workflow.

.. _output:

------
Expand Down
89 changes: 66 additions & 23 deletions ecoli/composites/ecoli_master_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from itertools import product

import numpy as np
import polars as pl
import json
import tempfile
import pytest
Expand All @@ -27,6 +28,15 @@
ECOLI_DEFAULT_TOPOLOGY,
)
from ecoli.experiments.ecoli_master_sim import EcoliSim, CONFIG_DIR_PATH
from ecoli.library.parquet_emitter import dataset_sql, create_duckdb_conn


@pytest.fixture
def parquet_out_dir():
"""Temporary directory for Parquet emitter output."""
with tempfile.TemporaryDirectory() as out_dir:
yield out_dir


TRANSLATION_SUPPLY_FLAGS = [
"mechanistic_translation_supply",
Expand Down Expand Up @@ -227,12 +237,8 @@ def test_division(agent_id="0", max_duration=4):
for name, mols in mother_state["unique"].items():
d1_state = daughter_states[0]["unique"][name]
d2_state = daughter_states[1]["unique"][name]
mol_keys = sim.ecoli_experiment.state["agents"]["00"]["unique"][
name
].value.dtype.names
entryState_col = np.where(np.array(mol_keys) == "_entryState")[0][0]
n_mother = sum(mols[entryState_col])
n_daughter = sum(d1_state[entryState_col]) + sum(d2_state[entryState_col])
n_mother = sum(mols["_entryState"])
n_daughter = sum(d1_state["_entryState"]) + sum(d2_state["_entryState"])
if name == "chromosome_domain":
# Chromosome domain 0 is lost after division because
# it has been fully split into child domains 1 and 2
Expand All @@ -241,8 +247,7 @@ def test_division(agent_id="0", max_duration=4):
f"{name}: mother has {n_mother}, daughters have {n_daughter}"
)
# Assert that no unique mol is in both daughters
unique_idx_col = np.where(np.array(mol_keys) == "unique_index")[0][0]
assert not (set(d1_state[unique_idx_col]) & set(d2_state[unique_idx_col]))
assert not (set(d1_state["unique_index"]) & set(d2_state["unique_index"]))

# asserts
final_agents = output[max_duration]["agents"].keys()
Expand Down Expand Up @@ -415,31 +420,37 @@ def plot_spatial_snapshots(data, sim, experiment_dir="ecoli_test"):
)


def test_emit_unique():
def test_emit_unique(parquet_out_dir):
"""
Test that the ``emit_unique`` configuration option works. This can be broken
Verifies that unique molecule data is written to Parquet output when ``emit_unique``
is True. Make sure that every unique molecule type has at least one corresponding
column, including an integer unique index list. This can be broken
if a new process is added whose ports schema connects to a unique molecule
without setting the ``_emit`` property to ``config['emit_unique']``.
"""
sim = EcoliSim.from_file()
sim.config["experiment_id"] = "test_emit_unique_parquet"
sim.config["emit_unique"] = True
sim.config["max_duration"] = 1
sim.config["emitter"] = "parquet"
sim.config["emitter_arg"] = {"out_dir": parquet_out_dir}
sim.build_ecoli()
sim.run()
sim.ecoli_experiment.emitter.finalize()

unique_molecules = sim.ecoli_experiment.state["agents"]["0"]["unique"].inner.keys()
data = sim.query(
[
(
"agents",
"0",
"unique",
)
]
)
for val in data.values():
for unique_mol in unique_molecules:
assert unique_mol in val["agents"]["0"]["unique"]
assert isinstance(val["agents"]["0"]["unique"][unique_mol], list)
history_sql, _, _ = dataset_sql(parquet_out_dir, [sim.experiment_id])
conn = create_duckdb_conn()
t = conn.sql(f"SELECT * FROM ({history_sql})").pl()

for unique_mol in unique_molecules:
assert any(
c == f"unique__{unique_mol}" or c.startswith(f"unique__{unique_mol}__")
for c in t.columns
), f"Missing unique molecule '{unique_mol}' in Parquet output"
assert t.schema[f"unique__{unique_mol}__unique_index"] == pl.List(pl.Int64), (
f"Expected column 'unique__{unique_mol}__unique_index' to have dtype list[int]"
)


@pytest.mark.slow
Expand All @@ -454,6 +465,37 @@ def test_translation_flag_harness(flag_overrides):
run_two_second_simulation(flag_overrides)


def test_emit_paths(parquet_out_dir):
"""
Verifies that only the columns derived from ``emit_paths`` (plus Hive
partition ID columns) are written to the Parquet dataset.

Uses ``configs/test_emit_paths.json``.
"""
sim = EcoliSim.from_file(CONFIG_DIR_PATH + "test_emit_paths.json")
sim.config["emitter_arg"] = {"out_dir": parquet_out_dir}
sim.build_ecoli()
sim.run()
sim.ecoli_experiment.emitter.finalize()

history_sql, _, _ = dataset_sql(parquet_out_dir, [sim.experiment_id])
conn = create_duckdb_conn()
t = conn.sql(f"SELECT * FROM ({history_sql})").pl()

id_cols = {
"time",
"agent_id",
"experiment_id",
"generation",
"lineage_seed",
"variant",
}
emit_paths = {"__".join(col) for col in sim.config["emit_paths"]} | id_cols
assert set(t.columns) == emit_paths, (
f"Expected columns {emit_paths} but got {set(t.columns)}"
)


test_library = {
"1": test_division,
"2": test_division_topology,
Expand All @@ -462,6 +504,7 @@ def test_translation_flag_harness(flag_overrides):
"5": test_emit_unique,
"6": test_translation_flag_harness,
"7": test_daughter_state_includes_non_agent_state,
"8": test_emit_paths,
}

# run experiments in test_library from the command line with:
Expand Down
11 changes: 9 additions & 2 deletions ecoli/experiments/ecoli_master_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ def __init__(self, config: dict[str, Any]):
"""
# Do some datatype pre-processesing
config["processes"] = {process: None for process in config["processes"]}
config["emit_paths"] = [tuple(path) for path in config["emit_paths"]]

# Keep track of base experiment id
# in case multiple simulations are run with suffix_time = True.
Expand Down Expand Up @@ -706,6 +707,7 @@ def build_ecoli(self):
"agents",
self.agent_id,
)
self.emit_stores = [path + emit_path for emit_path in self.emit_paths]

# get initial state
initial_cell_state = ecoli_composer.initial_state()
Expand Down Expand Up @@ -852,7 +854,7 @@ def run(self):
metadata["output_metadata"] = self.output_metadata()
# make the experiment
if isinstance(self.emitter, str):
self.emitter_config = {"type": self.emitter}
self.emitter_config = {"type": self.emitter, "emit_paths": self.emit_paths}
Comment thread
cplong90 marked this conversation as resolved.
if self.emitter_arg is not None:
for key, value in self.emitter_arg.items():
self.emitter_config[key] = value
Expand Down Expand Up @@ -920,7 +922,7 @@ def run(self):
if self.config["emit_paths"]:
self.ecoli_experiment.state.set_emit_values([tuple()], False)
self.ecoli_experiment.state.set_emit_values(
self.config["emit_paths"],
self.emit_stores,
True,
Comment thread
thalassemia marked this conversation as resolved.
)

Expand Down Expand Up @@ -1092,6 +1094,11 @@ def main():
ecoli_sim = EcoliSim.from_cli()
ecoli_sim.build_ecoli()
ecoli_sim.run()
# When max_duration is specified, a simulation can finish without dividing
# or raising an Exception. In this case, we still want to finalize the
# Parquet emitter to ensure all buffered data is written to file.
if isinstance(ecoli_sim.ecoli_experiment.emitter, ParquetEmitter):
ecoli_sim.ecoli_experiment.emitter.finalize()


if __name__ == "__main__":
Expand Down
20 changes: 20 additions & 0 deletions ecoli/library/parquet_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ def __init__(self, config: dict[str, Any]) -> None:
group (optional, default: 400),
'threaded': Whether to write Parquet files
in a background thread (optional, default: True),
'emit_paths': List of tuple paths to include
in emitted data (optional, omit agent path),
# One of the following is REQUIRED
'out_dir': local output directory (absolute/relative),
'out_uri': Google Cloud storage bucket URI
Expand Down Expand Up @@ -909,6 +911,14 @@ def __init__(self, config: dict[str, Any]) -> None:
self.last_batch_future.set_result(None)
# Set either by EcoliSim or by EngineProcess if sim reaches division
self.success = False
# Convert tuple paths to flat key prefixes for fast filtering
emit_paths = config.get("emit_paths", [])
if len(emit_paths) > 0:
self.emit_prefixes: Optional[set[str]] = {
"__".join(path) for path in emit_paths
}
else:
self.emit_prefixes = None

def finalize(self):
"""Convert remaining batched emits to Parquet at sim shutdown
Expand Down Expand Up @@ -1054,6 +1064,16 @@ def emit(self, data: dict[str, Any]):
for agent_data in data["data"]["agents"].values():
agent_data["time"] = float(data["data"]["time"])
agent_data = flatten_dict(agent_data)
if self.emit_prefixes is not None:
agent_data = {
k: v
for k, v in agent_data.items()
if k == "time"
or any(
k == prefix or k.startswith(prefix + "__")
for prefix in self.emit_prefixes
)
}
emit_idx = self.num_emits % self.batch_size
# At every emit, each field can take one of two paths.
#
Expand Down
12 changes: 8 additions & 4 deletions ecoli/library/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def counts(states: np.ndarray, idx: int | np.ndarray) -> np.ndarray:
class get_bulk_counts(Serializer):
"""Serializer for bulk molecules that saves counts without IDs or masses."""

@staticmethod
def serialize(bulk: np.ndarray) -> np.ndarray:
"""
Args:
Expand All @@ -256,15 +257,18 @@ def serialize(bulk: np.ndarray) -> np.ndarray:
class get_unique_fields(Serializer):
"""Serializer for unique molecules."""

def serialize(unique: np.ndarray) -> list[np.ndarray]:
@staticmethod
def serialize(unique: np.ndarray) -> dict[str, np.ndarray]:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
"""
Args:
unique: Numpy structured array of attributes for one unique molecule

Returns:
List of contiguous (required by orjson) arrays, one for each attribute
Mapping of attributes to contiguous (required by orjson) arrays
"""
return [np.ascontiguousarray(unique[field]) for field in unique.dtype.names]
return {
field: np.ascontiguousarray(unique[field]) for field in unique.dtype.names
}


def numpy_schema(name: str, emit: bool = True) -> Dict[str, Any]:
Expand All @@ -288,7 +292,7 @@ def numpy_schema(name: str, emit: bool = True) -> Dict[str, Any]:
# Since vivarium-core ensures that each store will only have a single
# updater, it's OK to create new UniqueNumpyUpdater objects each time
schema["_updater"] = UniqueNumpyUpdater().updater
# Convert to list of contiguous Numpy arrays for faster and more
# Convert to dictionary of contiguous Numpy arrays for faster and more
# efficient serialization (still do not recommend emitting unique)
schema["_serializer"] = get_unique_fields
schema["_divider"] = UNIQUE_DIVIDERS[name]
Expand Down
Loading