From 29a537cd868ea785c0eb6528947d0c9d41d58500 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 22 Jun 2026 18:34:53 -0700 Subject: [PATCH 1/4] feat(amber): add loop-bookkeeping columns to materialized State (dormant) Extend the cross-region State materialization format from 1 column (content) to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri. The loop bookkeeping is promoted to first-class columns (never inside the content JSON), and the transport carries them: OutputManager state writer + emit, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the loop columns to 0/"", so every existing non-loop caller is unchanged, and fromTuple/from_tuple read only the content column. The columns activate only once the loop operators set them (follow-up PR). State materialization is intra-execution (execution-scoped iceberg URI, recreated fresh each run), so no backward-compatible read of old 1-column data is needed. Extracted from #5700 (loop operators); part of #4442. --- .../architecture/packaging/output_manager.py | 110 +++--- amber/src/main/python/core/models/payload.py | 9 + amber/src/main/python/core/models/state.py | 34 +- .../python/core/runnables/network_receiver.py | 7 +- .../python/core/runnables/network_sender.py | 15 +- ...ut_port_materialization_reader_runnable.py | 18 +- .../messaginglayer/OutputManager.scala | 2 +- .../pythonworker/PythonProxyClient.scala | 2 +- .../packaging/test_output_manager.py | 23 +- .../test_state_materialization_e2e.py | 361 +++++++++--------- .../src/test/python/core/models/test_state.py | 26 +- .../core/runnables/test_network_receiver.py | 13 +- ...ut_port_materialization_reader_runnable.py | 29 +- .../texera/amber/core/state/State.scala | 23 +- .../texera/amber/core/state/StateSpec.scala | 8 +- .../result/iceberg/IcebergDocumentSpec.scala | 4 +- .../texera/amber/util/ArrowUtilsSpec.scala | 47 +++ 17 files changed, 451 insertions(+), 280 deletions(-) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index b85e3e39bf1..38fb18211bf 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -133,47 +133,31 @@ def set_up_port_storage_writer(self, port_id: PortIdentity, storage_uri_base: st state materialization on the same port. `storage_uri_base` is the port's base URI; the result and state URIs are derived from it. """ - document, _ = DocumentFactory.open_document( - VFSURIFactory.result_uri(storage_uri_base) - ) - buffered_item_writer = document.writer(str(get_worker_index(self.worker_id))) - writer_queue = Queue() - port_storage_writer = PortStorageWriter( - buffered_item_writer=buffered_item_writer, queue=writer_queue - ) - writer_thread = threading.Thread( - target=port_storage_writer.run, - daemon=True, - name=f"port_storage_writer_thread_{port_id}", - ) - writer_thread.start() - self._port_storage_writers[port_id] = ( - writer_queue, - port_storage_writer, - writer_thread, - ) - state_document, _ = DocumentFactory.open_document( - VFSURIFactory.state_uri(storage_uri_base) - ) - state_buffered_item_writer = state_document.writer( - str(get_worker_index(self.worker_id)) - ) - state_writer_queue = Queue() - state_port_writer = PortStorageWriter( - buffered_item_writer=state_buffered_item_writer, - queue=state_writer_queue, - ) - state_writer_thread = threading.Thread( - target=state_port_writer.run, - daemon=True, - name=f"port_state_writer_thread_{port_id}", + def start_writer(uri: str, name_prefix: str, registry: dict) -> None: + document, _ = DocumentFactory.open_document(uri) + writer_queue = Queue() + writer = PortStorageWriter( + buffered_item_writer=document.writer( + str(get_worker_index(self.worker_id)) + ), + queue=writer_queue, + ) + thread = threading.Thread( + target=writer.run, daemon=True, name=f"{name_prefix}_{port_id}" + ) + thread.start() + registry[port_id] = (writer_queue, writer, thread) + + start_writer( + VFSURIFactory.result_uri(storage_uri_base), + "port_storage_writer_thread", + self._port_storage_writers, ) - state_writer_thread.start() - self._port_state_writers[port_id] = ( - state_writer_queue, - state_port_writer, - state_writer_thread, + start_writer( + VFSURIFactory.state_uri(storage_uri_base), + "port_state_writer_thread", + self._port_state_writers, ) def get_port(self, port_id=None) -> WorkerPort: @@ -203,14 +187,23 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, port_id=None) -> None: PortStorageWriterElement(data_tuple=tuple_) ) - def save_state_to_storage_if_needed(self, state: State, port_id=None) -> None: + def save_state_to_storage_if_needed( + self, + state: State, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", + port_id=None, + ) -> None: # When port_id is omitted the same state row is fanned out to # every output port's state table. This mirrors the # broadcast-to-all-workers behavior on the emit side: state is # shared context, not per-key data, so every downstream operator # (and every worker reading the materialization) needs the full # set. - element = PortStorageWriterElement(data_tuple=state.to_tuple()) + element = PortStorageWriterElement( + data_tuple=state.to_tuple(loop_counter, loop_start_id, loop_start_state_uri) + ) if port_id is None: for writer_queue, _, _ in self._port_state_writers.values(): writer_queue.put(element) @@ -223,18 +216,16 @@ def close_port_storage_writers(self) -> None: writer threads to finish, which indicates the port storage writing are finished. """ - for _, writer, _ in self._port_storage_writers.values(): - # This non-blocking stop call will let the storage writers - # flush the remaining buffer - writer.stop() - for _, _, writer_thread in self._port_storage_writers.values(): - # This blocking call will wait for all the writer to finish commit - writer_thread.join() - for _, state_writer, _ in self._port_state_writers.values(): - state_writer.stop() - for _, _, state_writer_thread in self._port_state_writers.values(): - state_writer_thread.join() - self._port_state_writers.clear() + for registry in (self._port_storage_writers, self._port_state_writers): + # Non-blocking stop lets each writer flush its remaining buffer; + # the join then waits for the commit to finish. + for _, writer, _ in registry.values(): + writer.stop() + for _, _, thread in registry.values(): + thread.join() + # Drop the stopped writers so a later close doesn't act on + # stale entries. + registry.clear() def add_partitioning(self, tag: PhysicalLink, partitioning: Partitioning) -> None: """ @@ -290,7 +281,11 @@ def emit_ecm( ) def emit_state( - self, state: State + self, + state: State, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", ) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]: return chain( *( @@ -298,7 +293,12 @@ def emit_state( ( receiver, ( - StateFrame(payload) + StateFrame( + payload, + loop_counter=loop_counter, + loop_start_id=loop_start_id, + loop_start_state_uri=loop_start_state_uri, + ) if isinstance(payload, State) else self.tuple_to_frame(payload) ), diff --git a/amber/src/main/python/core/models/payload.py b/amber/src/main/python/core/models/payload.py index 61a33294882..3f1ec4e7fe3 100644 --- a/amber/src/main/python/core/models/payload.py +++ b/amber/src/main/python/core/models/payload.py @@ -34,3 +34,12 @@ class DataFrame(DataPayload): @dataclass class StateFrame(DataPayload): frame: State + # Loop-control bookkeeping owned by the worker runtime, carried alongside + # the State payload (not inside it) so it never collides with user state. + # Defaults are the "no loop" values for all non-loop state. + loop_counter: int = 0 + # Which LoopStart to jump back to, and the iceberg URI its input is read + # from. Set by the runtime on a LoopStart's output, consumed by the + # matching LoopEnd. Empty for non-loop / not-yet-stamped state. + loop_start_id: str = "" + loop_start_state_uri: str = "" diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 003aaa212ac..4fce475e499 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -25,13 +25,41 @@ class State(dict): CONTENT = "content" - SCHEMA = Schema(raw_schema={CONTENT: "STRING"}) + # Loop-control bookkeeping owned by the worker runtime, NOT user state -- it + # never appears in the content JSON. In memory it rides on the StateFrame + # envelope; it is materialized/serialized as its own column (parallel to + # content) by to_tuple(...). from_tuple() returns the bare State; callers + # that need these values read the corresponding columns off the tuple. + LOOP_COUNTER = "loop_counter" + LOOP_START_ID = "loop_start_id" + LOOP_START_STATE_URI = "loop_start_state_uri" + SCHEMA = Schema( + raw_schema={ + CONTENT: "STRING", + LOOP_COUNTER: "LONG", + LOOP_START_ID: "STRING", + LOOP_START_STATE_URI: "STRING", + } + ) def to_json(self) -> str: return json.dumps(_to_json_value(self), separators=(",", ":")) - def to_tuple(self) -> Tuple: - return Tuple({State.CONTENT: self.to_json()}, schema=State.SCHEMA) + def to_tuple( + self, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", + ) -> Tuple: + return Tuple( + { + State.CONTENT: self.to_json(), + State.LOOP_COUNTER: int(loop_counter), + State.LOOP_START_ID: loop_start_id, + State.LOOP_START_STATE_URI: loop_start_state_uri, + }, + schema=State.SCHEMA, + ) @classmethod def from_json(cls, payload: str) -> "State": diff --git a/amber/src/main/python/core/runnables/network_receiver.py b/amber/src/main/python/core/runnables/network_receiver.py index 8ba4fbe1472..2c672a07750 100644 --- a/amber/src/main/python/core/runnables/network_receiver.py +++ b/amber/src/main/python/core/runnables/network_receiver.py @@ -96,7 +96,12 @@ def data_handler(command: bytes, table: Table) -> int: "Data", lambda _: DataFrame(table), "State", - lambda _: StateFrame(State.from_json(table[State.CONTENT][0].as_py())), + lambda _: StateFrame( + State.from_json(table[State.CONTENT][0].as_py()), + loop_counter=int(table[State.LOOP_COUNTER][0].as_py()), + loop_start_id=table[State.LOOP_START_ID][0].as_py(), + loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(), + ), "ECM", lambda _: EmbeddedControlMessage().parse(table["payload"][0].as_py()), ) diff --git a/amber/src/main/python/core/runnables/network_sender.py b/amber/src/main/python/core/runnables/network_sender.py index d8e3889ac11..68d89e0ebf1 100644 --- a/amber/src/main/python/core/runnables/network_sender.py +++ b/amber/src/main/python/core/runnables/network_sender.py @@ -20,7 +20,13 @@ from overrides import overrides from typing import Optional -from core.models import DataPayload, InternalQueue, DataFrame, State, StateFrame +from core.models import ( + DataPayload, + InternalQueue, + DataFrame, + State, + StateFrame, +) from core.models.internal_queue import ( InternalQueueElement, DataElement, @@ -100,7 +106,12 @@ def _send_data(self, to: ChannelIdentity, data_payload: DataPayload) -> None: elif isinstance(data_payload, StateFrame): data_header = PythonDataHeader(tag=to, payload_type="State") table = pa.Table.from_pydict( - {State.CONTENT: [data_payload.frame.to_json()]}, + { + State.CONTENT: [data_payload.frame.to_json()], + State.LOOP_COUNTER: [int(data_payload.loop_counter)], + State.LOOP_START_ID: [data_payload.loop_start_id], + State.LOOP_START_STATE_URI: [data_payload.loop_start_state_uri], + }, schema=State.SCHEMA.as_arrow_schema(), ) self._proxy_client.send_data(bytes(data_header), table) diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index 3e0e2d48ab5..97a24fe073d 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -34,7 +34,14 @@ from core.architecture.sendsemantics.round_robin_partitioner import ( RoundRobinPartitioner, ) -from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, StateFrame +from core.models import ( + Tuple, + InternalQueue, + DataFrame, + DataPayload, + State, + StateFrame, +) from core.models.internal_queue import DataElement, ECMElement from core.storage.document_factory import DocumentFactory from core.storage.vfs_uri_factory import VFSURIFactory @@ -152,7 +159,14 @@ def run(self) -> None: VFSURIFactory.state_uri(self.uri) ) for state_row in state_document.get(): - self.emit_payload(StateFrame(State.from_tuple(state_row))) + self.emit_payload( + StateFrame( + State.from_tuple(state_row), + loop_counter=state_row[State.LOOP_COUNTER], + loop_start_id=state_row[State.LOOP_START_ID], + loop_start_state_uri=state_row[State.LOOP_START_STATE_URI], + ) + ) storage_iterator = self.materialization.get() # Iterate and process tuples. diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 030fa3a3bbd..80cc24780b0 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -242,7 +242,7 @@ class OutputManager( // emit side: state is shared context, not per-key data, so every // downstream operator (and every worker reading the materialization) // needs the full set. - stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple))) + stateWriterThreads.values.foreach(_.queue.put(Left(state.toTuple()))) } /** diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala index 6618e857b1d..144c3ac57a3 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala @@ -125,7 +125,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], val actorId: ActorVirtu case DataFrame(frame) => writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*), from, "Data") case StateFrame(state) => - writeArrowStream(mutable.Queue(state.toTuple), from, "State") + writeArrowStream(mutable.Queue(state.toTuple()), from, "State") } } diff --git a/amber/src/test/python/core/architecture/packaging/test_output_manager.py b/amber/src/test/python/core/architecture/packaging/test_output_manager.py index dcf7ccde673..d132bd9b623 100644 --- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py +++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py @@ -49,15 +49,15 @@ def port_b(self): @pytest.fixture def state(self): - return State({"loop_counter": 1, "i": 2}) + return State({"i": 2}) def test_no_state_writers_is_a_noop(self, output_manager, state): # With no port set up, save_state_to_storage_if_needed must not # touch any writer. - output_manager.save_state_to_storage_if_needed(state) # no-op + output_manager.save_state_to_storage_if_needed(state, 0) # no-op def test_unknown_port_id_is_a_noop(self, output_manager, state, port_a): - output_manager.save_state_to_storage_if_needed(state, port_id=port_a) + output_manager.save_state_to_storage_if_needed(state, 0, port_id=port_a) # No assertion needed -- the absence of any writer means nothing # was attempted. @@ -67,7 +67,7 @@ def test_enqueues_to_every_port_when_port_id_omitted( queue_a, _, _ = _stub_state_writer(output_manager, port_a) queue_b, _, _ = _stub_state_writer(output_manager, port_b) - output_manager.save_state_to_storage_if_needed(state) + output_manager.save_state_to_storage_if_needed(state, 0) # Each port's writer queue receives one PortStorageWriterElement. # Critically, save is non-blocking -- the call must not invoke @@ -84,7 +84,7 @@ def test_enqueues_only_to_selected_port_when_port_id_specified( queue_a, _, _ = _stub_state_writer(output_manager, port_a) queue_b, _, _ = _stub_state_writer(output_manager, port_b) - output_manager.save_state_to_storage_if_needed(state, port_id=port_a) + output_manager.save_state_to_storage_if_needed(state, 0, port_id=port_a) assert queue_a.put.call_count == 1 queue_b.put.assert_not_called() @@ -105,3 +105,16 @@ def test_close_port_storage_writers_stops_state_threads( thread_a.join.assert_called_once() thread_b.join.assert_called_once() assert output_manager._port_state_writers == {} + + def test_defaults_loop_columns_when_omitted(self, output_manager, state, port_a): + # Dormancy: callers that pass no loop bookkeeping (every non-loop + # caller, e.g. MainLoop.process_input_state) still produce a valid + # 4-column state tuple with the loop columns at their no-loop defaults. + queue_a, _, _ = _stub_state_writer(output_manager, port_a) + + output_manager.save_state_to_storage_if_needed(state) # no loop_counter + + data_tuple = queue_a.put.call_args.args[0].data_tuple + assert data_tuple[State.LOOP_COUNTER] == 0 + assert data_tuple[State.LOOP_START_ID] == "" + assert data_tuple[State.LOOP_START_STATE_URI] == "" diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index 8613be95b18..43e2959cd95 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -25,7 +25,7 @@ OutputManager.set_up_port_storage_writer(port, base_uri) → real PortStorageWriter thread - → real IcebergTableWriter (postgres-backed JdbcCatalog) + → real IcebergTableWriter (sqlite-backed SqlCatalog) → state document at VFSURIFactory.state_uri(base_uri) → InputPortMaterializationReaderRunnable.run() → DataElement(StateFrame) on the consumer's input queue @@ -33,20 +33,14 @@ and asserts that a state put through `save_state_to_storage_if_needed` on the producer side actually arrives at the consumer's queue, with the same payload. - -Marked @integration so the CI runner that has postgres + iceberg -catalog DB provisioned (amber-integration) picks it up via -`pytest -m integration`. Earlier versions of this test substituted a -sqlite-backed SqlCatalog to dodge that infra dependency; that diverged -from the prod catalog code path, so we now exercise the real one. """ -import os import tempfile import threading import uuid import pytest +from pyiceberg.catalog.sql import SqlCatalog from core.architecture.packaging.output_manager import OutputManager from core.models import State, StateFrame @@ -74,203 +68,198 @@ ) -@pytest.mark.integration -class TestStateMaterializationE2E: - @pytest.fixture(autouse=True, scope="class") - def _init_storage_config(self): - """Initialize StorageConfig + IcebergCatalogInstance for the real - postgres-backed catalog in the `amber-integration` CI job. - - Critical detail: the Scala integration tests that run earlier in - the same job connect to the iceberg catalog DB as user - `postgres/postgres` (the storage.conf default for - `STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME/PASSWORD`). pyiceberg - creates the catalog's `iceberg_tables` metadata table on first - use, owned by whoever wrote first — so it ends up owned by - `postgres`. We MUST connect as the same user, otherwise we hit - `permission denied for table iceberg_tables`. +# Module-level scratch dir for the sqlite catalog + iceberg warehouse. +# We don't initialize `StorageConfig` here: other test modules (e.g. +# test_iceberg_document.py) also call `StorageConfig.initialize` at +# import time, and the class rejects re-initialization with +# RuntimeError. Whichever module gets collected first wins; we adopt +# its namespaces below. +_WAREHOUSE_DIR = tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-") - Why the reset: `test_iceberg_document.py` also calls - `StorageConfig.initialize` at module import time (with a - different `texera/password` user that works for it because no - Scala writes first in the `pyamber` job where it runs). pytest - imports every test module during collection, even ones whose - tests will be deselected by `-m integration`, so that - initialization happens here too. We force-reset the singletons - and re-init with the prod-correct credentials; safe because - test_iceberg_document's tests are deselected from this run. - All catalog + S3 settings read the same `STORAGE_*` env vars - the production code consumes (via storage.conf), so the test - matches whichever identity the Scala side uses in the same job - and stays aligned with the bucket / endpoint the workflow - provisions. Defaults mirror storage.conf so a local sbt run - without those vars exported still works. +@pytest.fixture(scope="module", autouse=True) +def sqlite_iceberg_catalog(): + """Inject a sqlite-backed SqlCatalog so the test runs without external + iceberg infra (postgres/minio). - Class-scoped so the reset + tempdir allocation happens once - per class; the two tests in this class share state through the - same StorageConfig singleton anyway. - """ - StorageConfig._initialized = False - IcebergCatalogInstance._instance = None - large_binaries_bucket = os.environ.get( - "STORAGE_S3_LARGE_BINARIES_BUCKET", "texera-large-binaries" - ) + Module-scoped so all tests in this file share one warehouse, and so + namespace creation only happens once. We save/restore the original + `IcebergCatalogInstance` singleton so other test modules that expect + a real postgres-backed catalog (e.g. test_iceberg_document.py) are + not affected by our replacement. + """ + # Some other test module may have initialized StorageConfig already + # (it has a single-init lock). If nothing has initialized it yet, + # do it here with arbitrary values -- we replace the catalog + # instance below so the postgres/rest fields are never exercised. + if not StorageConfig._initialized: StorageConfig.initialize( catalog_type="postgres", - postgres_uri_without_scheme=os.environ.get( - "STORAGE_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME", - "localhost:5432/texera_iceberg_catalog", - ), - postgres_username=os.environ.get( - "STORAGE_ICEBERG_CATALOG_POSTGRES_USERNAME", "postgres" - ), - postgres_password=os.environ.get( - "STORAGE_ICEBERG_CATALOG_POSTGRES_PASSWORD", "postgres" - ), - rest_catalog_uri="http://localhost:8181/catalog/", - rest_catalog_warehouse_name="texera", + postgres_uri_without_scheme="unused", + postgres_username="unused", + postgres_password="unused", + rest_catalog_uri="unused", + rest_catalog_warehouse_name="unused", table_result_namespace="operator-port-result", table_state_namespace="operator-port-state", - directory_path=tempfile.mkdtemp(prefix="texera-state-e2e-warehouse-"), + directory_path=_WAREHOUSE_DIR, commit_batch_size=4096, - s3_endpoint=os.environ.get("STORAGE_S3_ENDPOINT", "http://localhost:9000"), - s3_region=os.environ.get("STORAGE_S3_REGION", "us-west-2"), - s3_auth_username=os.environ.get("STORAGE_S3_AUTH_USERNAME", "texera_minio"), - s3_auth_password=os.environ.get("STORAGE_S3_AUTH_PASSWORD", "password"), - s3_large_binaries_base_uri=f"s3://{large_binaries_bucket}/objects/0/", + s3_endpoint="unused", + s3_region="unused", + s3_auth_username="unused", + s3_auth_password="unused", + s3_large_binaries_base_uri="s3://texera-large-binaries/objects/0/", ) - @pytest.fixture - def base_uri(self) -> str: - """A unique port-base URI per test so tables don't collide.""" - return VFSURIFactory.create_port_base_uri( - WorkflowIdentity(id=0), - ExecutionIdentity(id=0), - GlobalPortIdentity( - op_id=PhysicalOpIdentity( - logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"), - layer_name="main", - ), - port_id=PortIdentity(id=0, internal=False), - input=False, + original_instance = IcebergCatalogInstance._instance + db_path = f"{_WAREHOUSE_DIR}/catalog.sqlite" + catalog = SqlCatalog( + "texera_iceberg_e2e", + **{ + "uri": f"sqlite:///{db_path}", + "warehouse": f"file://{_WAREHOUSE_DIR}", + }, + ) + # Adopt whatever namespaces StorageConfig already has -- those are + # the ones DocumentFactory will route into. + catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_RESULT_NAMESPACE) + catalog.create_namespace_if_not_exists(StorageConfig.ICEBERG_TABLE_STATE_NAMESPACE) + IcebergCatalogInstance.replace_instance(catalog) + try: + yield catalog + finally: + IcebergCatalogInstance.replace_instance(original_instance) + + +def _fresh_base_uri() -> str: + """A unique port-base URI per test so tables don't collide.""" + return VFSURIFactory.create_port_base_uri( + WorkflowIdentity(id=0), + ExecutionIdentity(id=0), + GlobalPortIdentity( + op_id=PhysicalOpIdentity( + logical_op_id=OperatorIdentity(id=f"e2e-{uuid.uuid4().hex}"), + layer_name="main", ), - ) + port_id=PortIdentity(id=0, internal=False), + input=False, + ), + ) - @pytest.fixture - def producer(self, base_uri): - """An OutputManager wired to the iceberg result + state documents - at `base_uri`. Closes its writer threads on teardown so cached - buffers are flushed even if a test errors out before - `close_port_storage_writers()`. - """ - # RegionExecutionCoordinator's responsibility in prod: provision - # result + state documents at the port base URI before any - # worker starts. We emulate that here. - DocumentFactory.create_document( - VFSURIFactory.result_uri(base_uri), State.SCHEMA - ) - DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) - mgr = OutputManager(worker_id="Worker:WF0-test-producer-main-0") - mgr.add_output_port( - PortIdentity(id=0, internal=False), - schema=State.SCHEMA, - storage_uri_base=base_uri, - ) - try: - yield mgr - finally: - # close_port_storage_writers is idempotent — fine to call - # again here if the test already closed. - try: - mgr.close_port_storage_writers() - except Exception: - pass +def test_state_written_by_output_manager_is_replayed_by_reader(): + """Producer side writes a state via OutputManager; consumer side reads + it via InputPortMaterializationReaderRunnable. The state must arrive + on the consumer's input queue intact. + """ + base_uri = _fresh_base_uri() + port_id = PortIdentity(id=0, internal=False) + worker_schema_for_result = State.SCHEMA # producer-side: only state matters - def test_state_written_by_output_manager_is_replayed_by_reader( - self, base_uri, producer - ): - """Producer side writes a state via OutputManager; consumer side - reads it via InputPortMaterializationReaderRunnable. The state - must arrive on the consumer's input queue intact. - """ - # Drive a state through the producer-side path. - state = State({"flag": True, "loop_counter": 7, "name": "outer"}) - producer.save_state_to_storage_if_needed(state) + # 1. RegionExecutionCoordinator's responsibility: provision result + + # state documents at the port base URI before any worker starts. + # We emulate that here. + DocumentFactory.create_document( + VFSURIFactory.result_uri(base_uri), worker_schema_for_result + ) + DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) - # Force the writer threads to flush + commit by closing them. - # Without this, the iceberg buffer holds the state in memory - # and nothing is durable yet. - producer.close_port_storage_writers() + # 2. Producer side: spin up an OutputManager, set up real state + + # result writer threads against the iceberg storage. + producer = OutputManager(worker_id="Worker:WF0-test-producer-main-0") + producer.add_output_port( + port_id, schema=worker_schema_for_result, storage_uri_base=base_uri + ) - # Consumer side: spin up the materialization reader against the - # same base URI. Each reader needs a partitioning even when no - # real downstream worker exists — supply a OneToOnePartitioning - # whose only receiver is the consumer worker itself. - consumer_worker = ActorVirtualIdentity(name="consumer-worker-0") - consumer_queue = InternalQueue() - partitioning = Partitioning( - one_to_one_partitioning=OneToOnePartitioning( - batch_size=400, - channels=[ - ChannelIdentity( - from_worker_id=ActorVirtualIdentity(name="producer-worker-0"), - to_worker_id=consumer_worker, - is_control=False, - ) - ], - ) - ) - reader = InputPortMaterializationReaderRunnable( - uri=base_uri, - queue=consumer_queue, - worker_actor_id=consumer_worker, - partitioning=partitioning, + # 3. Drive a state through the producer-side path. loop_counter rides + # alongside the State (not inside it) and is materialized as its own column. + state = State({"flag": True, "name": "outer"}) + producer.save_state_to_storage_if_needed(state, 7) + + # 4. Force the writer threads to flush + commit by closing them. + # Without this, the iceberg buffer holds the state in memory and + # nothing is durable yet. + producer.close_port_storage_writers() + + # 5. Consumer side: spin up the materialization reader against the + # same base URI. Each reader needs a partitioning even when no real + # downstream worker exists -- supply a OneToOnePartitioning whose + # only receiver is the consumer worker itself. + consumer_worker = ActorVirtualIdentity(name="consumer-worker-0") + consumer_queue = InternalQueue() + partitioning = Partitioning( + one_to_one_partitioning=OneToOnePartitioning( + batch_size=400, + channels=[ + ChannelIdentity( + from_worker_id=ActorVirtualIdentity(name="producer-worker-0"), + to_worker_id=consumer_worker, + is_control=False, + ) + ], ) + ) + reader = InputPortMaterializationReaderRunnable( + uri=base_uri, + queue=consumer_queue, + worker_actor_id=consumer_worker, + partitioning=partitioning, + ) - # Run the reader on a worker thread so we can time out cleanly - # if something goes wrong. - reader_thread = threading.Thread(target=reader.run, daemon=True) - reader_thread.start() - reader_thread.join(timeout=30) - assert not reader_thread.is_alive(), "reader did not finish within timeout" - assert reader.finished(), "reader exited but did not mark itself finished" + # Run the reader on a worker thread so we can time out cleanly if + # something goes wrong. + reader_thread = threading.Thread(target=reader.run, daemon=True) + reader_thread.start() + reader_thread.join(timeout=30) + assert not reader_thread.is_alive(), "reader did not finish within timeout" + assert reader.finished(), "reader exited but did not mark itself finished" - # Drain the consumer's queue and find the StateFrame(s). - state_frames: list[State] = [] - while not consumer_queue.is_empty(): - elem = consumer_queue.get() - if isinstance(elem, DataElement) and isinstance(elem.payload, StateFrame): - state_frames.append(elem.payload.frame) + # 6. Drain the consumer's queue and find the StateFrame(s). + state_frames: list[StateFrame] = [] + while not consumer_queue.is_empty(): + elem = consumer_queue.get() + if isinstance(elem, DataElement) and isinstance(elem.payload, StateFrame): + state_frames.append(elem.payload) - assert len(state_frames) == 1, ( - f"expected exactly one State to flow through writer→iceberg→reader; " - f"got {len(state_frames)}: {state_frames}" - ) - assert state_frames[0] == state, ( - f"replayed state did not match what was written; " - f"wrote={state}, read={state_frames[0]}" - ) + assert len(state_frames) == 1, ( + f"expected exactly one State to flow through writer→iceberg→reader; " + f"got {len(state_frames)}: {state_frames}" + ) + assert state_frames[0].frame == state, ( + f"replayed state did not match what was written; " + f"wrote={state}, read={state_frames[0].frame}" + ) + assert state_frames[0].loop_counter == 7, ( + f"loop_counter must round-trip through its own column; " + f"got {state_frames[0].loop_counter}" + ) - def test_state_table_persists_across_writer_close(self, base_uri, producer): - """Independently verify the iceberg state table contains the row. - If this passes but the reader test above fails, the bug is in - the reader / consumer wiring; if this fails, the bug is in the - writer / storage layer. - """ - state = State({"flag": False, "checkpoint": 42}) - producer.save_state_to_storage_if_needed(state) - producer.close_port_storage_writers() - # Read directly from the iceberg state document, bypassing the - # reader. - state_document, _ = DocumentFactory.open_document( - VFSURIFactory.state_uri(base_uri) - ) - rows = list(state_document.get()) - assert len(rows) == 1, ( - f"expected exactly one row in the iceberg state table after " - f"the writer was closed; got {len(rows)} rows" - ) - assert State.from_tuple(rows[0]) == state +def test_state_table_persists_across_writer_close(): + """Independently verify the iceberg state table contains the row. + If this passes but the reader test above fails, the bug is in the + reader / consumer wiring; if this fails, the bug is in the writer / + storage layer. + """ + base_uri = _fresh_base_uri() + port_id = PortIdentity(id=0, internal=False) + + DocumentFactory.create_document(VFSURIFactory.result_uri(base_uri), State.SCHEMA) + DocumentFactory.create_document(VFSURIFactory.state_uri(base_uri), State.SCHEMA) + + producer = OutputManager(worker_id="Worker:WF0-test-producer2-main-0") + producer.add_output_port(port_id, schema=State.SCHEMA, storage_uri_base=base_uri) + + state = State({"flag": False, "checkpoint": 42}) + producer.save_state_to_storage_if_needed(state, 0) + producer.close_port_storage_writers() + + # Read directly from the iceberg state document, bypassing the reader. + state_document, _ = DocumentFactory.open_document(VFSURIFactory.state_uri(base_uri)) + rows = list(state_document.get()) + assert len(rows) == 1, ( + f"expected exactly one row in the iceberg state table after the " + f"writer was closed; got {len(rows)} rows" + ) + assert State.from_tuple(rows[0]) == state + assert rows[0][State.LOOP_COUNTER] == 0 diff --git a/amber/src/test/python/core/models/test_state.py b/amber/src/test/python/core/models/test_state.py index aef2297130b..b51a01267bb 100644 --- a/amber/src/test/python/core/models/test_state.py +++ b/amber/src/test/python/core/models/test_state.py @@ -29,7 +29,17 @@ def test_state_subclasses_dict(self): def test_class_attributes(self): assert State.CONTENT == "content" - assert State.SCHEMA.get_attr_names() == ["content"] + assert State.LOOP_COUNTER == "loop_counter" + assert State.LOOP_START_ID == "loop_start_id" + assert State.LOOP_START_STATE_URI == "loop_start_state_uri" + # The loop-control columns are runtime-owned bookkeeping, sibling to + # content, not part of the user state JSON. + assert State.SCHEMA.get_attr_names() == [ + "content", + "loop_counter", + "loop_start_id", + "loop_start_state_uri", + ] def test_json_round_trip_primitives(self): original = State( @@ -83,14 +93,20 @@ class Custom: State({"bad": Custom()}).to_json() def test_tuple_round_trip(self): - original = State({"loop_counter": 3, "label": "outer", "blob": b"\x01\x02"}) + original = State({"i": 3, "label": "outer", "blob": b"\x01\x02"}) decoded = State.from_tuple(original.to_tuple()) assert decoded == original - def test_to_tuple_uses_state_schema(self): - tuple_ = State({"x": 1}).to_tuple() - # Single STRING column whose value is the JSON serialization. + def test_to_tuple_writes_content_and_loop_counter_columns(self): + tuple_ = State({"x": 1}).to_tuple(7) + # content holds the JSON serialization; loop_counter is its own column. assert tuple_[State.CONTENT] == '{"x":1}' + assert tuple_[State.LOOP_COUNTER] == 7 + # loop_counter must not leak into the content JSON. + assert "loop_counter" not in tuple_[State.CONTENT] + + def test_to_tuple_defaults_loop_counter_to_zero(self): + assert State({"x": 1}).to_tuple()[State.LOOP_COUNTER] == 0 def test_nested_dict_decodes_to_plain_dict(self): # Top-level returns a State; nested dicts come back as plain dict. diff --git a/amber/src/test/python/core/runnables/test_network_receiver.py b/amber/src/test/python/core/runnables/test_network_receiver.py index bf890e4a2f0..49ba2408efa 100644 --- a/amber/src/test/python/core/runnables/test_network_receiver.py +++ b/amber/src/test/python/core/runnables/test_network_receiver.py @@ -152,16 +152,19 @@ def test_network_receiver_can_receive_consecutive_state_messages( worker_id = ActorVirtualIdentity(name="test") channel_id = ChannelIdentity(worker_id, worker_id, False) + # loop_counter rides the StateFrame envelope (its own Arrow column), + # not the user content. Use a non-zero counter so the round-trip + # actually exercises the second column over the sender->receiver wire. input_queue.put( DataElement( tag=channel_id, - payload=StateFrame(State({"loop_counter": 0, "i": 1})), + payload=StateFrame(State({"i": 1}), loop_counter=0), ) ) input_queue.put( DataElement( tag=channel_id, - payload=StateFrame(State({"loop_counter": 1, "i": 2})), + payload=StateFrame(State({"i": 2}), loop_counter=5), ) ) @@ -169,11 +172,13 @@ def test_network_receiver_can_receive_consecutive_state_messages( second_element: DataElement = output_queue.get() assert isinstance(first_element.payload, StateFrame) - assert first_element.payload.frame == {"loop_counter": 0, "i": 1} + assert first_element.payload.frame == {"i": 1} + assert first_element.payload.loop_counter == 0 assert first_element.tag == channel_id assert isinstance(second_element.payload, StateFrame) - assert second_element.payload.frame == {"loop_counter": 1, "i": 2} + assert second_element.payload.frame == {"i": 2} + assert second_element.payload.loop_counter == 5 assert second_element.tag == channel_id @pytest.mark.timeout(10) diff --git a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py index 5016c2df2f1..59635d0e872 100644 --- a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py +++ b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py @@ -60,16 +60,27 @@ def runnable(self, me): return instance def test_state_rows_are_emitted_as_state_frames(self, runnable): - state_a = State({"loop_counter": 0}) - state_b = State({"loop_counter": 1}) + state_a = State({"i": 0}) + state_b = State({"i": 1}) - # The state document yields opaque tuples; from_tuple deserializes - # them. Patch from_tuple so we don't have to wire a real - # serialization. + # The state document yields opaque multi-column tuples. State.from_tuple + # (patched) deserializes the content column; the reader reads the + # loop-control columns directly off the row and carries them onto the + # emitted StateFrame envelope. + row_a = { + State.LOOP_COUNTER: 0, + State.LOOP_START_ID: "loop-a", + State.LOOP_START_STATE_URI: "vfs:///a", + } + row_b = { + State.LOOP_COUNTER: 1, + State.LOOP_START_ID: "loop-b", + State.LOOP_START_STATE_URI: "vfs:///b", + } result_doc = MagicMock() result_doc.get.return_value = iter([]) # No materialized tuples. state_doc = MagicMock() - state_doc.get.return_value = iter(["row-a", "row-b"]) + state_doc.get.return_value = iter([row_a, row_b]) with ( patch( @@ -96,4 +107,10 @@ def test_state_rows_are_emitted_as_state_frames(self, runnable): and isinstance(call.args[0].payload, StateFrame) ] assert [sf.payload.frame for sf in state_frames] == [state_a, state_b] + assert [sf.payload.loop_counter for sf in state_frames] == [0, 1] + assert [sf.payload.loop_start_id for sf in state_frames] == ["loop-a", "loop-b"] + assert [sf.payload.loop_start_state_uri for sf in state_frames] == [ + "vfs:///a", + "vfs:///b", + ] assert runnable._finished is True diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index ba146f1d57c..92103477d1d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -31,18 +31,35 @@ final case class State(values: Map[String, Any]) { def toJson: String = objectMapper.writeValueAsString(State.toJsonValue(values)) - def toTuple: Tuple = - Tuple.builder(State.schema).addSequentially(Array(toJson)).build() + def toTuple( + loopCounter: Long = 0L, + loopStartId: String = "", + loopStartStateUri: String = "" + ): Tuple = + Tuple + .builder(State.schema) + .addSequentially(Array(toJson, Long.box(loopCounter), loopStartId, loopStartStateUri)) + .build() } object State { private val Content = "content" + // loop-control bookkeeping owned by the (Python) worker runtime; not user + // state and never in the content JSON. Materialized as its own columns, + // parallel to content. Scala never originates loop state (loop operators are + // Python-only), so toTuple defaults these to the "no loop" values. + private val LoopCounter = "loop_counter" + private val LoopStartId = "loop_start_id" + private val LoopStartStateUri = "loop_start_state_uri" private val BytesTypeMarker = "__texera_type__" private val BytesValue = "bytes" private val PayloadMarker = "payload" val schema: Schema = new Schema( - new Attribute(Content, AttributeType.STRING) + new Attribute(Content, AttributeType.STRING), + new Attribute(LoopCounter, AttributeType.LONG), + new Attribute(LoopStartId, AttributeType.STRING), + new Attribute(LoopStartStateUri, AttributeType.STRING) ) def fromJson(payload: String): State = diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala index 976a585e31a..1e6a0dc0178 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala @@ -98,13 +98,13 @@ class StateSpec extends AnyFlatSpec { it should "tuple-round-trip" in { val original = State( Map( - "loop_counter" -> 3L, + "i" -> 3L, "label" -> "outer", "blob" -> Array[Byte](1, 2) ) ) - val decoded = State.fromTuple(original.toTuple) - assert(decoded.values("loop_counter") == 3L) + val decoded = State.fromTuple(original.toTuple()) + assert(decoded.values("i") == 3L) assert(decoded.values("label") == "outer") assert( decoded.values("blob").asInstanceOf[Array[Byte]].sameElements(Array[Byte](1, 2)) @@ -112,7 +112,7 @@ class StateSpec extends AnyFlatSpec { } it should "produce a tuple whose payload is the JSON serialization" in { - val tuple = State(Map("x" -> 1L)).toTuple + val tuple = State(Map("x" -> 1L)).toTuple() assert(tuple.getSchema == State.schema) assert(tuple.getField[String]("content") == """{"x":1}""") } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index d21644f6e64..f56eb31db6f 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -220,7 +220,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - writer.putOne(state.toTuple) + writer.putOne(state.toTuple()) writer.close() val storedRows = stateDocument.get().toList @@ -252,7 +252,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - states.foreach(state => writer.putOne(state.toTuple)) + states.foreach(state => writer.putOne(state.toTuple())) writer.close() val deserializedStates = diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala index 62b10a66864..486deefbcc2 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala @@ -19,8 +19,11 @@ package org.apache.texera.amber.util +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.types.{FloatingPointPrecision, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType} +import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema} import org.apache.texera.amber.core.tuple.AttributeTypeUtils.AttributeTypeException import org.scalatest.flatspec.AnyFlatSpec @@ -282,4 +285,48 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers { any.getMetadata.get("texera_type") shouldBe "ANY" Option(name.getMetadata).map(_.containsKey("texera_type")).getOrElse(false) shouldBe false } + + // ----- Tuple <-> Arrow data round-trip (the State wire-hop contract) ----- + + "tuple round-trip through Arrow vectors" should "preserve every column of a multi-column State tuple" in { + // The Python<->Scala state wire hop goes Tuple -> setTexeraTuple -> Arrow + // (PythonProxyClient.writeArrowStream) on one side and + // Arrow -> getTexeraTuple -> Tuple (PythonProxyServer) on the other. + // The schema-only round-trip tests above don't exercise the per-row data + // encode/decode, so a column dropped or mistyped there would slip through. + // Pin that the full multi-column State tuple (content STRING + the + // loop-control columns loop_counter LONG, loop_start_id / loop_start_state_uri + // STRING) survives a real setTexeraTuple -> Arrow vectors -> getTexeraTuple + // round-trip with every column intact -- the property the wire hop relies on. + val original = + State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L, "outer-loop", "vfs:///outer") + + val allocator = new RootAllocator() + val root = VectorSchemaRoot.create(ArrowUtils.fromTexeraSchema(original.getSchema), allocator) + try { + root.allocateNew() + ArrowUtils.setTexeraTuple(original, 0, root) + root.setRowCount(1) + + val recovered = ArrowUtils.getTexeraTuple(0, root) + + // Every column survives the encode/decode, with names and types intact. + recovered.getSchema.getAttributes.toList.map(a => (a.getName, a.getType)) shouldBe + List( + ("content", AttributeType.STRING), + ("loop_counter", AttributeType.LONG), + ("loop_start_id", AttributeType.STRING), + ("loop_start_state_uri", AttributeType.STRING) + ) + // content (the user State JSON) round-trips... + State.fromTuple(recovered).values shouldBe Map("i" -> 5L, "label" -> "outer") + // ...and so do the loop-control columns. + recovered.getField[java.lang.Long]("loop_counter").toLong shouldBe 3L + recovered.getField[String]("loop_start_id") shouldBe "outer-loop" + recovered.getField[String]("loop_start_state_uri") shouldBe "vfs:///outer" + } finally { + root.close() + allocator.close() + } + } } From db66a7f9e90f445681ab87f4bff1beed493a2fbe Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 22 Jun 2026 21:56:46 -0700 Subject: [PATCH 2/4] test(amber): assert dedicated State loop columns in IcebergDocumentSpec Address review (Copilot): the materialization round-trip tests used "loop_counter" as a user-state key (landing in the content JSON), which is misleading now that loop bookkeeping is a dedicated column. Rename the user key to "i" and write/assert the loop columns via toTuple(loopCounter = ...) + row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching how StateSpec/test_state were updated. --- .../result/iceberg/IcebergDocumentSpec.scala | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index f56eb31db6f..8f52b0eceec 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -211,7 +211,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]] val state = State( Map( - "loop_counter" -> 3, + "i" -> 3, "name" -> "outer-loop", "payload" -> Array[Byte](0, 1, 2, 3), "nested" -> Map("enabled" -> true, "values" -> List(1, 2, 3)) @@ -220,13 +220,20 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - writer.putOne(state.toTuple()) + writer.putOne( + state.toTuple(loopCounter = 7L, loopStartId = "ls", loopStartStateUri = "vfs:///outer") + ) writer.close() val storedRows = stateDocument.get().toList assert(storedRows.length == 1) + // Loop bookkeeping is materialized as its own columns, not in the content JSON. + assert(storedRows.head.getField[java.lang.Long]("loop_counter").toLong == 7L) + assert(storedRows.head.getField[String]("loop_start_id") == "ls") + assert(storedRows.head.getField[String]("loop_start_state_uri") == "vfs:///outer") + // User state round-trips through the content column (fromTuple reads only content). val deserialized = State.fromTuple(storedRows.head).values - assert(deserialized("loop_counter") == 3L) + assert(deserialized("i") == 3L) assert(deserialized("name") == "outer-loop") assert(deserialized("payload").asInstanceOf[Array[Byte]].sameElements(Array[Byte](0, 1, 2, 3))) assert(deserialized("nested").asInstanceOf[Map[String, Any]]("enabled") == true) @@ -238,45 +245,45 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter DocumentFactory.createDocument(stateUri, State.schema) val stateDocument = DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]] - val states: List[State] = List( - State(Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 3))), - State( - Map( - "loop_counter" -> 1, - "i" -> 2, - "payload" -> Array[Byte](4, 5, 6), - "nested" -> Map("values" -> List(3, 4)) - ) + // (user state, loopCounter) -- the counter is written to its own column. + val states: List[(State, Long)] = List( + (State(Map("i" -> 1, "payload" -> Array[Byte](1, 2, 3))), 0L), + ( + State( + Map( + "i" -> 2, + "payload" -> Array[Byte](4, 5, 6), + "nested" -> Map("values" -> List(3, 4)) + ) + ), + 1L ) ) val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - states.foreach(state => writer.putOne(state.toTuple())) + states.foreach { case (state, loopCounter) => writer.putOne(state.toTuple(loopCounter)) } writer.close() - val deserializedStates = - stateDocument - .get() - .toList - .map(State.fromTuple) - .sortBy(_.values("loop_counter").asInstanceOf[Long]) - assert(deserializedStates.length == states.length) - deserializedStates.zip(states).foreach { - case (actual, expected) => - assert( - actual.values("loop_counter") == expected.values("loop_counter").asInstanceOf[Int].toLong - ) - assert(actual.values("i") == expected.values("i").asInstanceOf[Int].toLong) + val storedRows = + stateDocument.get().toList.sortBy(_.getField[java.lang.Long]("loop_counter").toLong) + assert(storedRows.length == states.length) + storedRows.zip(states).foreach { + case (row, (expectedState, expectedLoopCounter)) => + // loop_counter is its own column... + assert(row.getField[java.lang.Long]("loop_counter").toLong == expectedLoopCounter) + // ...and the user state round-trips through the content column. + val actual = State.fromTuple(row).values + assert(actual("i") == expectedState.values("i").asInstanceOf[Int].toLong) assert( - actual - .values("payload") + actual("payload") .asInstanceOf[Array[Byte]] - .sameElements(expected.values("payload").asInstanceOf[Array[Byte]]) + .sameElements(expectedState.values("payload").asInstanceOf[Array[Byte]]) ) } assert( - deserializedStates(1) + State + .fromTuple(storedRows(1)) .values("nested") .asInstanceOf[Map[String, Any]]("values") == List(3L, 4L) ) From 74eef1dafc5fa193a065beefcdd84efbe5d4b527 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 28 Jun 2026 09:57:46 -0700 Subject: [PATCH 3/4] refactor(amber): centralize State column mapping; strengthen round-trip tests Address review feedback on #5900: - Add `State.to_columns`, the single column-name -> value mapping for the State wire/storage format, and route both `to_tuple` (iceberg) and the network sender's StateFrame branch through it, so adding a column is a one-line change rather than an edit in every serializer. - e2e materialization test and StateSpec now round-trip all three loop columns (loop_counter, loop_start_id, loop_start_state_uri) with non-default values, not just loop_counter, so a regression in any single column's plumbing is caught. - Document why the e2e deliberately uses a hermetic sqlite catalog while the other iceberg tests use postgres/REST. --- amber/src/main/python/core/models/state.py | 28 +++++++++++--- .../python/core/runnables/network_sender.py | 13 ++++--- .../test_state_materialization_e2e.py | 38 ++++++++++++++++--- .../texera/amber/core/state/StateSpec.scala | 11 +++++- 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 4fce475e499..9491760266e 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -45,6 +45,25 @@ class State(dict): def to_json(self) -> str: return json.dumps(_to_json_value(self), separators=(",", ":")) + @staticmethod + def to_columns( + content_json: str, + loop_counter: int = 0, + loop_start_id: str = "", + loop_start_state_uri: str = "", + ) -> dict: + """The single column-name -> value mapping for the State wire/storage + format. Both ``to_tuple`` (iceberg materialization) and the network + sender build from this, so adding a column is a one-line change here + rather than in every serializer. + """ + return { + State.CONTENT: content_json, + State.LOOP_COUNTER: int(loop_counter), + State.LOOP_START_ID: loop_start_id, + State.LOOP_START_STATE_URI: loop_start_state_uri, + } + def to_tuple( self, loop_counter: int = 0, @@ -52,12 +71,9 @@ def to_tuple( loop_start_state_uri: str = "", ) -> Tuple: return Tuple( - { - State.CONTENT: self.to_json(), - State.LOOP_COUNTER: int(loop_counter), - State.LOOP_START_ID: loop_start_id, - State.LOOP_START_STATE_URI: loop_start_state_uri, - }, + State.to_columns( + self.to_json(), loop_counter, loop_start_id, loop_start_state_uri + ), schema=State.SCHEMA, ) diff --git a/amber/src/main/python/core/runnables/network_sender.py b/amber/src/main/python/core/runnables/network_sender.py index 68d89e0ebf1..5981a5de819 100644 --- a/amber/src/main/python/core/runnables/network_sender.py +++ b/amber/src/main/python/core/runnables/network_sender.py @@ -105,13 +105,14 @@ def _send_data(self, to: ChannelIdentity, data_payload: DataPayload) -> None: self._proxy_client.send_data(bytes(data_header), data_payload.frame) elif isinstance(data_payload, StateFrame): data_header = PythonDataHeader(tag=to, payload_type="State") + columns = State.to_columns( + data_payload.frame.to_json(), + data_payload.loop_counter, + data_payload.loop_start_id, + data_payload.loop_start_state_uri, + ) table = pa.Table.from_pydict( - { - State.CONTENT: [data_payload.frame.to_json()], - State.LOOP_COUNTER: [int(data_payload.loop_counter)], - State.LOOP_START_ID: [data_payload.loop_start_id], - State.LOOP_START_STATE_URI: [data_payload.loop_start_state_uri], - }, + {name: [value] for name, value in columns.items()}, schema=State.SCHEMA.as_arrow_schema(), ) self._proxy_client.send_data(bytes(data_header), table) diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index 43e2959cd95..b22dffe9877 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -82,6 +82,12 @@ def sqlite_iceberg_catalog(): """Inject a sqlite-backed SqlCatalog so the test runs without external iceberg infra (postgres/minio). + Note: the other iceberg-backed tests (e.g. test_iceberg_document.py) use a + postgres/REST catalog to mirror production. This e2e deliberately diverges + to a hermetic sqlite catalog so the writer→storage→reader join can run as a + fast, infra-free unit test -- the materialization logic it exercises is + catalog-agnostic, so the sqlite backend exercises the same code path. + Module-scoped so all tests in this file share one warehouse, and so namespace creation only happens once. We save/restore the original `IcebergCatalogInstance` singleton so other test modules that expect @@ -171,10 +177,17 @@ def test_state_written_by_output_manager_is_replayed_by_reader(): port_id, schema=worker_schema_for_result, storage_uri_base=base_uri ) - # 3. Drive a state through the producer-side path. loop_counter rides - # alongside the State (not inside it) and is materialized as its own column. + # 3. Drive a state through the producer-side path. The loop bookkeeping + # rides alongside the State (not inside it) and is materialized as its own + # set of columns. Use non-default values for all three so a regression in + # any single column's plumbing is caught, not just loop_counter's. state = State({"flag": True, "name": "outer"}) - producer.save_state_to_storage_if_needed(state, 7) + producer.save_state_to_storage_if_needed( + state, + loop_counter=7, + loop_start_id="outer-loop", + loop_start_state_uri="vfs:///wf/outer-state", + ) # 4. Force the writer threads to flush + commit by closing them. # Without this, the iceberg buffer holds the state in memory and @@ -233,6 +246,14 @@ def test_state_written_by_output_manager_is_replayed_by_reader(): f"loop_counter must round-trip through its own column; " f"got {state_frames[0].loop_counter}" ) + assert state_frames[0].loop_start_id == "outer-loop", ( + f"loop_start_id must round-trip through its own column; " + f"got {state_frames[0].loop_start_id!r}" + ) + assert state_frames[0].loop_start_state_uri == "vfs:///wf/outer-state", ( + f"loop_start_state_uri must round-trip through its own column; " + f"got {state_frames[0].loop_start_state_uri!r}" + ) def test_state_table_persists_across_writer_close(): @@ -251,7 +272,12 @@ def test_state_table_persists_across_writer_close(): producer.add_output_port(port_id, schema=State.SCHEMA, storage_uri_base=base_uri) state = State({"flag": False, "checkpoint": 42}) - producer.save_state_to_storage_if_needed(state, 0) + producer.save_state_to_storage_if_needed( + state, + loop_counter=3, + loop_start_id="inner-loop", + loop_start_state_uri="vfs:///wf/inner-state", + ) producer.close_port_storage_writers() # Read directly from the iceberg state document, bypassing the reader. @@ -262,4 +288,6 @@ def test_state_table_persists_across_writer_close(): f"writer was closed; got {len(rows)} rows" ) assert State.from_tuple(rows[0]) == state - assert rows[0][State.LOOP_COUNTER] == 0 + assert rows[0][State.LOOP_COUNTER] == 3 + assert rows[0][State.LOOP_START_ID] == "inner-loop" + assert rows[0][State.LOOP_START_STATE_URI] == "vfs:///wf/inner-state" diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala index 1e6a0dc0178..6677318e4ac 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala @@ -103,12 +103,21 @@ class StateSpec extends AnyFlatSpec { "blob" -> Array[Byte](1, 2) ) ) - val decoded = State.fromTuple(original.toTuple()) + val tuple = original.toTuple(5L, "outer-loop", "vfs:///outer") + + // Content round-trips through fromTuple, which reads only the content column. + val decoded = State.fromTuple(tuple) assert(decoded.values("i") == 3L) assert(decoded.values("label") == "outer") assert( decoded.values("blob").asInstanceOf[Array[Byte]].sameElements(Array[Byte](1, 2)) ) + + // The loop bookkeeping is carried in its own columns (not the content + // JSON, and not surfaced by fromTuple), so assert it off the raw tuple. + assert(tuple.getField[java.lang.Long]("loop_counter").toLong == 5L) + assert(tuple.getField[String]("loop_start_id") == "outer-loop") + assert(tuple.getField[String]("loop_start_state_uri") == "vfs:///outer") } it should "produce a tuple whose payload is the JSON serialization" in { From 9fa4d9ecad8139cea53e931d732ff7d17047f7a4 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 30 Jun 2026 19:02:01 -0700 Subject: [PATCH 4/4] refactor(amber): drop loop_start_state_uri from the State format Per review feedback on #5900: `loop_start_state_uri` is a write address that stays constant across the whole loop -- it's loop config, not per-iteration State data, and the design landed on delivering it to LoopEnd at setup rather than carrying it in State. Rather than ship the dormant column now and remove it later, drop it here. Reduces the materialized State format from 4 columns to 3 (`content`, `loop_counter`, `loop_start_id`). Removes the column and schema entry, the `StateFrame` field, and the transport/serialization plumbing (network sender/receiver, materialization reader, OutputManager save/emit, Scala `toTuple`), plus the corresponding test coverage. `content` / `loop_counter` / `loop_start_id` are unchanged. --- .../core/architecture/packaging/output_manager.py | 5 +---- amber/src/main/python/core/models/payload.py | 7 +++---- amber/src/main/python/core/models/state.py | 9 +-------- .../main/python/core/runnables/network_receiver.py | 1 - .../src/main/python/core/runnables/network_sender.py | 1 - .../input_port_materialization_reader_runnable.py | 1 - .../architecture/packaging/test_output_manager.py | 3 +-- .../packaging/test_state_materialization_e2e.py | 11 ++--------- amber/src/test/python/core/models/test_state.py | 2 -- ...est_input_port_materialization_reader_runnable.py | 6 ------ .../org/apache/texera/amber/core/state/State.scala | 9 +++------ .../apache/texera/amber/core/state/StateSpec.scala | 3 +-- .../storage/result/iceberg/IcebergDocumentSpec.scala | 5 +---- .../apache/texera/amber/util/ArrowUtilsSpec.scala | 12 +++++------- 14 files changed, 18 insertions(+), 57 deletions(-) diff --git a/amber/src/main/python/core/architecture/packaging/output_manager.py b/amber/src/main/python/core/architecture/packaging/output_manager.py index 38fb18211bf..90ceda2eb96 100644 --- a/amber/src/main/python/core/architecture/packaging/output_manager.py +++ b/amber/src/main/python/core/architecture/packaging/output_manager.py @@ -192,7 +192,6 @@ def save_state_to_storage_if_needed( state: State, loop_counter: int = 0, loop_start_id: str = "", - loop_start_state_uri: str = "", port_id=None, ) -> None: # When port_id is omitted the same state row is fanned out to @@ -202,7 +201,7 @@ def save_state_to_storage_if_needed( # (and every worker reading the materialization) needs the full # set. element = PortStorageWriterElement( - data_tuple=state.to_tuple(loop_counter, loop_start_id, loop_start_state_uri) + data_tuple=state.to_tuple(loop_counter, loop_start_id) ) if port_id is None: for writer_queue, _, _ in self._port_state_writers.values(): @@ -285,7 +284,6 @@ def emit_state( state: State, loop_counter: int = 0, loop_start_id: str = "", - loop_start_state_uri: str = "", ) -> Iterable[typing.Tuple[ActorVirtualIdentity, DataPayload]]: return chain( *( @@ -297,7 +295,6 @@ def emit_state( payload, loop_counter=loop_counter, loop_start_id=loop_start_id, - loop_start_state_uri=loop_start_state_uri, ) if isinstance(payload, State) else self.tuple_to_frame(payload) diff --git a/amber/src/main/python/core/models/payload.py b/amber/src/main/python/core/models/payload.py index 3f1ec4e7fe3..6fc3580679c 100644 --- a/amber/src/main/python/core/models/payload.py +++ b/amber/src/main/python/core/models/payload.py @@ -38,8 +38,7 @@ class StateFrame(DataPayload): # the State payload (not inside it) so it never collides with user state. # Defaults are the "no loop" values for all non-loop state. loop_counter: int = 0 - # Which LoopStart to jump back to, and the iceberg URI its input is read - # from. Set by the runtime on a LoopStart's output, consumed by the - # matching LoopEnd. Empty for non-loop / not-yet-stamped state. + # Which LoopStart to jump back to. Set by the runtime on a LoopStart's + # output, consumed by the matching LoopEnd. Empty for non-loop / + # not-yet-stamped state. loop_start_id: str = "" - loop_start_state_uri: str = "" diff --git a/amber/src/main/python/core/models/state.py b/amber/src/main/python/core/models/state.py index 9491760266e..559d6eca45f 100644 --- a/amber/src/main/python/core/models/state.py +++ b/amber/src/main/python/core/models/state.py @@ -32,13 +32,11 @@ class State(dict): # that need these values read the corresponding columns off the tuple. LOOP_COUNTER = "loop_counter" LOOP_START_ID = "loop_start_id" - LOOP_START_STATE_URI = "loop_start_state_uri" SCHEMA = Schema( raw_schema={ CONTENT: "STRING", LOOP_COUNTER: "LONG", LOOP_START_ID: "STRING", - LOOP_START_STATE_URI: "STRING", } ) @@ -50,7 +48,6 @@ def to_columns( content_json: str, loop_counter: int = 0, loop_start_id: str = "", - loop_start_state_uri: str = "", ) -> dict: """The single column-name -> value mapping for the State wire/storage format. Both ``to_tuple`` (iceberg materialization) and the network @@ -61,19 +58,15 @@ def to_columns( State.CONTENT: content_json, State.LOOP_COUNTER: int(loop_counter), State.LOOP_START_ID: loop_start_id, - State.LOOP_START_STATE_URI: loop_start_state_uri, } def to_tuple( self, loop_counter: int = 0, loop_start_id: str = "", - loop_start_state_uri: str = "", ) -> Tuple: return Tuple( - State.to_columns( - self.to_json(), loop_counter, loop_start_id, loop_start_state_uri - ), + State.to_columns(self.to_json(), loop_counter, loop_start_id), schema=State.SCHEMA, ) diff --git a/amber/src/main/python/core/runnables/network_receiver.py b/amber/src/main/python/core/runnables/network_receiver.py index 2c672a07750..c9f64c1fca0 100644 --- a/amber/src/main/python/core/runnables/network_receiver.py +++ b/amber/src/main/python/core/runnables/network_receiver.py @@ -100,7 +100,6 @@ def data_handler(command: bytes, table: Table) -> int: State.from_json(table[State.CONTENT][0].as_py()), loop_counter=int(table[State.LOOP_COUNTER][0].as_py()), loop_start_id=table[State.LOOP_START_ID][0].as_py(), - loop_start_state_uri=table[State.LOOP_START_STATE_URI][0].as_py(), ), "ECM", lambda _: EmbeddedControlMessage().parse(table["payload"][0].as_py()), diff --git a/amber/src/main/python/core/runnables/network_sender.py b/amber/src/main/python/core/runnables/network_sender.py index 5981a5de819..3dcffb98b75 100644 --- a/amber/src/main/python/core/runnables/network_sender.py +++ b/amber/src/main/python/core/runnables/network_sender.py @@ -109,7 +109,6 @@ def _send_data(self, to: ChannelIdentity, data_payload: DataPayload) -> None: data_payload.frame.to_json(), data_payload.loop_counter, data_payload.loop_start_id, - data_payload.loop_start_state_uri, ) table = pa.Table.from_pydict( {name: [value] for name, value in columns.items()}, diff --git a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py index 97a24fe073d..4bc4e68539f 100644 --- a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py +++ b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py @@ -164,7 +164,6 @@ def run(self) -> None: State.from_tuple(state_row), loop_counter=state_row[State.LOOP_COUNTER], loop_start_id=state_row[State.LOOP_START_ID], - loop_start_state_uri=state_row[State.LOOP_START_STATE_URI], ) ) diff --git a/amber/src/test/python/core/architecture/packaging/test_output_manager.py b/amber/src/test/python/core/architecture/packaging/test_output_manager.py index d132bd9b623..3eae42e9c19 100644 --- a/amber/src/test/python/core/architecture/packaging/test_output_manager.py +++ b/amber/src/test/python/core/architecture/packaging/test_output_manager.py @@ -109,7 +109,7 @@ def test_close_port_storage_writers_stops_state_threads( def test_defaults_loop_columns_when_omitted(self, output_manager, state, port_a): # Dormancy: callers that pass no loop bookkeeping (every non-loop # caller, e.g. MainLoop.process_input_state) still produce a valid - # 4-column state tuple with the loop columns at their no-loop defaults. + # 3-column state tuple with the loop columns at their no-loop defaults. queue_a, _, _ = _stub_state_writer(output_manager, port_a) output_manager.save_state_to_storage_if_needed(state) # no loop_counter @@ -117,4 +117,3 @@ def test_defaults_loop_columns_when_omitted(self, output_manager, state, port_a) data_tuple = queue_a.put.call_args.args[0].data_tuple assert data_tuple[State.LOOP_COUNTER] == 0 assert data_tuple[State.LOOP_START_ID] == "" - assert data_tuple[State.LOOP_START_STATE_URI] == "" diff --git a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py index b22dffe9877..b77bdca3f54 100644 --- a/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py +++ b/amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py @@ -179,14 +179,13 @@ def test_state_written_by_output_manager_is_replayed_by_reader(): # 3. Drive a state through the producer-side path. The loop bookkeeping # rides alongside the State (not inside it) and is materialized as its own - # set of columns. Use non-default values for all three so a regression in - # any single column's plumbing is caught, not just loop_counter's. + # set of columns. Use non-default values for both so a regression in + # either column's plumbing is caught, not just loop_counter's. state = State({"flag": True, "name": "outer"}) producer.save_state_to_storage_if_needed( state, loop_counter=7, loop_start_id="outer-loop", - loop_start_state_uri="vfs:///wf/outer-state", ) # 4. Force the writer threads to flush + commit by closing them. @@ -250,10 +249,6 @@ def test_state_written_by_output_manager_is_replayed_by_reader(): f"loop_start_id must round-trip through its own column; " f"got {state_frames[0].loop_start_id!r}" ) - assert state_frames[0].loop_start_state_uri == "vfs:///wf/outer-state", ( - f"loop_start_state_uri must round-trip through its own column; " - f"got {state_frames[0].loop_start_state_uri!r}" - ) def test_state_table_persists_across_writer_close(): @@ -276,7 +271,6 @@ def test_state_table_persists_across_writer_close(): state, loop_counter=3, loop_start_id="inner-loop", - loop_start_state_uri="vfs:///wf/inner-state", ) producer.close_port_storage_writers() @@ -290,4 +284,3 @@ def test_state_table_persists_across_writer_close(): assert State.from_tuple(rows[0]) == state assert rows[0][State.LOOP_COUNTER] == 3 assert rows[0][State.LOOP_START_ID] == "inner-loop" - assert rows[0][State.LOOP_START_STATE_URI] == "vfs:///wf/inner-state" diff --git a/amber/src/test/python/core/models/test_state.py b/amber/src/test/python/core/models/test_state.py index b51a01267bb..b1c9fad35a6 100644 --- a/amber/src/test/python/core/models/test_state.py +++ b/amber/src/test/python/core/models/test_state.py @@ -31,14 +31,12 @@ def test_class_attributes(self): assert State.CONTENT == "content" assert State.LOOP_COUNTER == "loop_counter" assert State.LOOP_START_ID == "loop_start_id" - assert State.LOOP_START_STATE_URI == "loop_start_state_uri" # The loop-control columns are runtime-owned bookkeeping, sibling to # content, not part of the user state JSON. assert State.SCHEMA.get_attr_names() == [ "content", "loop_counter", "loop_start_id", - "loop_start_state_uri", ] def test_json_round_trip_primitives(self): diff --git a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py index 59635d0e872..288ee6374b3 100644 --- a/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py +++ b/amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py @@ -70,12 +70,10 @@ def test_state_rows_are_emitted_as_state_frames(self, runnable): row_a = { State.LOOP_COUNTER: 0, State.LOOP_START_ID: "loop-a", - State.LOOP_START_STATE_URI: "vfs:///a", } row_b = { State.LOOP_COUNTER: 1, State.LOOP_START_ID: "loop-b", - State.LOOP_START_STATE_URI: "vfs:///b", } result_doc = MagicMock() result_doc.get.return_value = iter([]) # No materialized tuples. @@ -109,8 +107,4 @@ def test_state_rows_are_emitted_as_state_frames(self, runnable): assert [sf.payload.frame for sf in state_frames] == [state_a, state_b] assert [sf.payload.loop_counter for sf in state_frames] == [0, 1] assert [sf.payload.loop_start_id for sf in state_frames] == ["loop-a", "loop-b"] - assert [sf.payload.loop_start_state_uri for sf in state_frames] == [ - "vfs:///a", - "vfs:///b", - ] assert runnable._finished is True diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala index 92103477d1d..f2896c8e19b 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala @@ -33,12 +33,11 @@ final case class State(values: Map[String, Any]) { def toTuple( loopCounter: Long = 0L, - loopStartId: String = "", - loopStartStateUri: String = "" + loopStartId: String = "" ): Tuple = Tuple .builder(State.schema) - .addSequentially(Array(toJson, Long.box(loopCounter), loopStartId, loopStartStateUri)) + .addSequentially(Array(toJson, Long.box(loopCounter), loopStartId)) .build() } @@ -50,7 +49,6 @@ object State { // Python-only), so toTuple defaults these to the "no loop" values. private val LoopCounter = "loop_counter" private val LoopStartId = "loop_start_id" - private val LoopStartStateUri = "loop_start_state_uri" private val BytesTypeMarker = "__texera_type__" private val BytesValue = "bytes" private val PayloadMarker = "payload" @@ -58,8 +56,7 @@ object State { val schema: Schema = new Schema( new Attribute(Content, AttributeType.STRING), new Attribute(LoopCounter, AttributeType.LONG), - new Attribute(LoopStartId, AttributeType.STRING), - new Attribute(LoopStartStateUri, AttributeType.STRING) + new Attribute(LoopStartId, AttributeType.STRING) ) def fromJson(payload: String): State = diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala index 6677318e4ac..f0af0b30c8a 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala @@ -103,7 +103,7 @@ class StateSpec extends AnyFlatSpec { "blob" -> Array[Byte](1, 2) ) ) - val tuple = original.toTuple(5L, "outer-loop", "vfs:///outer") + val tuple = original.toTuple(5L, "outer-loop") // Content round-trips through fromTuple, which reads only the content column. val decoded = State.fromTuple(tuple) @@ -117,7 +117,6 @@ class StateSpec extends AnyFlatSpec { // JSON, and not surfaced by fromTuple), so assert it off the raw tuple. assert(tuple.getField[java.lang.Long]("loop_counter").toLong == 5L) assert(tuple.getField[String]("loop_start_id") == "outer-loop") - assert(tuple.getField[String]("loop_start_state_uri") == "vfs:///outer") } it should "produce a tuple whose payload is the JSON serialization" in { diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 8f52b0eceec..e77dbbe9bf7 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -220,9 +220,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter val writer = stateDocument.writer(UUID.randomUUID().toString) writer.open() - writer.putOne( - state.toTuple(loopCounter = 7L, loopStartId = "ls", loopStartStateUri = "vfs:///outer") - ) + writer.putOne(state.toTuple(loopCounter = 7L, loopStartId = "ls")) writer.close() val storedRows = stateDocument.get().toList @@ -230,7 +228,6 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter // Loop bookkeeping is materialized as its own columns, not in the content JSON. assert(storedRows.head.getField[java.lang.Long]("loop_counter").toLong == 7L) assert(storedRows.head.getField[String]("loop_start_id") == "ls") - assert(storedRows.head.getField[String]("loop_start_state_uri") == "vfs:///outer") // User state round-trips through the content column (fromTuple reads only content). val deserialized = State.fromTuple(storedRows.head).values assert(deserialized("i") == 3L) diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala index 486deefbcc2..4321008d46b 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala @@ -295,11 +295,11 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers { // The schema-only round-trip tests above don't exercise the per-row data // encode/decode, so a column dropped or mistyped there would slip through. // Pin that the full multi-column State tuple (content STRING + the - // loop-control columns loop_counter LONG, loop_start_id / loop_start_state_uri - // STRING) survives a real setTexeraTuple -> Arrow vectors -> getTexeraTuple - // round-trip with every column intact -- the property the wire hop relies on. + // loop-control columns loop_counter LONG, loop_start_id STRING) survives a + // real setTexeraTuple -> Arrow vectors -> getTexeraTuple round-trip with + // every column intact -- the property the wire hop relies on. val original = - State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L, "outer-loop", "vfs:///outer") + State(Map("i" -> 5L, "label" -> "outer")).toTuple(3L, "outer-loop") val allocator = new RootAllocator() val root = VectorSchemaRoot.create(ArrowUtils.fromTexeraSchema(original.getSchema), allocator) @@ -315,15 +315,13 @@ class ArrowUtilsSpec extends AnyFlatSpec with Matchers { List( ("content", AttributeType.STRING), ("loop_counter", AttributeType.LONG), - ("loop_start_id", AttributeType.STRING), - ("loop_start_state_uri", AttributeType.STRING) + ("loop_start_id", AttributeType.STRING) ) // content (the user State JSON) round-trips... State.fromTuple(recovered).values shouldBe Map("i" -> 5L, "label" -> "outer") // ...and so do the loop-control columns. recovered.getField[java.lang.Long]("loop_counter").toLong shouldBe 3L recovered.getField[String]("loop_start_id") shouldBe "outer-loop" - recovered.getField[String]("loop_start_state_uri") shouldBe "vfs:///outer" } finally { root.close() allocator.close()