diff --git a/amber/src/main/python/core/util/virtual_identity.py b/amber/src/main/python/core/util/virtual_identity.py index 93a887d7c68..49da75fcd58 100644 --- a/amber/src/main/python/core/util/virtual_identity.py +++ b/amber/src/main/python/core/util/virtual_identity.py @@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity) -> str: Expected format: ``(logicalOpId=,layerName=, portId=,isInternal=,isInput=)`` + + Raises ValueError if `logicalOpId` or `layerName` contains an underscore + (VFS URI parsing relies on the absence of '_'), or if `portId` is negative. """ logical_op_id = obj.op_id.logical_op_id.id layer_name = obj.op_id.layer_name port_id = obj.port_id.id is_internal = obj.port_id.internal is_input_port = obj.input + if "_" in logical_op_id: + raise ValueError( + f"logicalOpId must not contain '_' " + f"(VFS URI parsing relies on this): {logical_op_id}" + ) + if "_" in layer_name: + raise ValueError( + f"layerName must not contain '_' " + f"(VFS URI parsing relies on this): {layer_name}" + ) + if port_id < 0: + raise ValueError(f"portId must be non-negative: {port_id}") return ( f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id}," f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})" @@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) -> GlobalPortIdentity: match.groups() ) port_id = int(port_id_str) + if port_id < 0: + raise ValueError(f"portId must be non-negative: {port_id}") is_internal = is_internal_str.lower() == "true" is_input_port = is_input_str.lower() == "true" op_id = PhysicalOpIdentity( diff --git a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py index a218c64a2d8..327b9073063 100644 --- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py +++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py @@ -86,7 +86,7 @@ def iceberg_document(self, amber_schema): ExecutionIdentity(id=0), GlobalPortIdentity( op_id=PhysicalOpIdentity( - logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"), + logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"), layer_name="main", ), port_id=PortIdentity(id=0), diff --git a/amber/src/test/python/core/util/test_virtual_identity.py b/amber/src/test/python/core/util/test_virtual_identity.py index 431dd84146f..c2f6f636850 100644 --- a/amber/src/test/python/core/util/test_virtual_identity.py +++ b/amber/src/test/python/core/util/test_virtual_identity.py @@ -105,6 +105,21 @@ def test_round_trips_through_deserialize(self): assert recovered.port_id.internal is True assert recovered.input is False + def test_rejects_underscore_in_logical_op_id(self): + # VFS-compatibility contract: serialized output must be + # underscore-free. Fail fast at the boundary on underscored input. + with pytest.raises(ValueError, match="logicalOpId must not contain"): + serialize_global_port_identity(_gpi(op_id="__DummyOperator")) + + def test_rejects_underscore_in_layer_name(self): + with pytest.raises(ValueError, match="layerName must not contain"): + serialize_global_port_identity(_gpi(layer="main_source_0_op")) + + def test_rejects_negative_port_id(self): + # Port ids are array indices and must be non-negative. + with pytest.raises(ValueError, match="portId must be non-negative"): + serialize_global_port_identity(_gpi(port=-1)) + class TestDeserializeGlobalPortIdentity: def test_parses_canonical_encoded_string(self): @@ -137,6 +152,14 @@ def test_raises_value_error_on_missing_field(self): "(logicalOpId=op,layerName=l,portId=0,isInternal=true)" ) + def test_raises_value_error_on_negative_port_id(self): + # Symmetric with the serializer: tampered URIs with a negative + # portId must be rejected on the way back in. + with pytest.raises(ValueError, match="portId must be non-negative"): + deserialize_global_port_identity( + "(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)" + ) + class TestGetFromActorIdForInputPortStorage: def test_prefixes_materialization_reader_to_uri_plus_actor_name(self): diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala index 48c087443c1..c8fd8e1a363 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/serde/GlobalPortIdentitySerde.scala @@ -23,17 +23,20 @@ import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity} /** * Serialize and deserializes a GlobalPortIdentity object to a string using a custom, human-readable format - * to ensure it works with both URI and file path and does not incldue underscore "_" so that it does not - * interfere with our own VFS URI parsing. + * to ensure it works with both URI and file path and does not include underscore "_" so that it does not + * interfere with our own VFS URI parsing. Underscores in `logicalOpId` / `layerName` and negative `portId` + * values are rejected with `IllegalArgumentException`. */ object GlobalPortIdentitySerde { implicit class SerdeOps(globalPortId: GlobalPortIdentity) { /** * Serializes a GlobalPortIdentity object into a string using our custom, human-readable format - * that works with both URI and file path and does not incldue underscore "_" so that it does not + * that works with both URI and file path and does not include underscore "_" so that it does not * interfere with our own VFS URI parsing. * + * @throws java.lang.IllegalArgumentException if `logicalOpId` or `layerName` contains an underscore, + * or if `portId.id` is negative. * @return A serialized string representation of globalPortId */ def serializeAsString: String = { @@ -42,6 +45,15 @@ object GlobalPortIdentitySerde { val portId = globalPortId.portId.id val isInternal = globalPortId.portId.internal val isInput = globalPortId.input + require( + !logicalOpId.contains('_'), + s"logicalOpId must not contain '_' (VFS URI parsing relies on this): $logicalOpId" + ) + require( + !layerName.contains('_'), + s"layerName must not contain '_' (VFS URI parsing relies on this): $layerName" + ) + require(portId >= 0, s"portId must be non-negative: $portId") s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)" } } @@ -58,6 +70,7 @@ object GlobalPortIdentitySerde { serializedGlobalPortId match { case pattern(logicalOpId, layerName, portIdStr, isInternalStr, isInputStr) => val portIdInt = portIdStr.toInt + require(portIdInt >= 0, s"portId must be non-negative: $portIdInt") val isInternal = isInternalStr.toBoolean val isInput = isInputStr.toBoolean val physicalOpId = PhysicalOpIdentity( diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala index 8fdf039f3ea..eb259fed582 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala @@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter GlobalPortIdentity( PhysicalOpIdentity( logicalOpId = - OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"), + OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"), layerName = "main" ), PortIdentity() diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala index 175ebc2c01b..b7cf776eb82 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala @@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with BeforeAndAfterAll with Suit GlobalPortIdentity( PhysicalOpIdentity( logicalOpId = - OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"), + OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"), layerName = "main" ), PortIdentity() diff --git a/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala new file mode 100644 index 00000000000..89a815c39e3 --- /dev/null +++ b/common/workflow-core/src/test/scala/org/apache/texera/amber/util/serde/PortIdentitySerdeSpec.scala @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.util.serde + +import org.apache.texera.amber.core.virtualidentity.{OperatorIdentity, PhysicalOpIdentity} +import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity} +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps +import org.scalatest.flatspec.AnyFlatSpec + +class PortIdentitySerdeSpec extends AnyFlatSpec { + + // --------------------------------------------------------------------------- + // GlobalPortIdentitySerde + // --------------------------------------------------------------------------- + + private def globalPort( + logical: String = "op-A", + layer: String = "main", + portIdValue: Int = 0, + internal: Boolean = false, + input: Boolean = true + ): GlobalPortIdentity = + GlobalPortIdentity( + opId = PhysicalOpIdentity(OperatorIdentity(logical), layer), + portId = PortIdentity(id = portIdValue, internal = internal), + input = input + ) + + "GlobalPortIdentitySerde" should "round-trip a default GlobalPortIdentity through serializeAsString → deserializeFromString" in { + val original = globalPort() + val restored = GlobalPortIdentitySerde.deserializeFromString(original.serializeAsString) + assert(restored == original) + } + + it should "preserve all five fields independently across the round-trip" in { + // Vary each field individually so a regression that swapped two fields + // (e.g., isInput / isInternal) would surface here, not as a general + // round-trip failure. + val cases = Seq( + globalPort(logical = "op-A"), + globalPort(logical = "op-Z"), + globalPort(layer = "main"), + globalPort(layer = "extra-layer"), + globalPort(portIdValue = 0), + globalPort(portIdValue = 7), + globalPort(internal = false), + globalPort(internal = true), + globalPort(input = true), + globalPort(input = false) + ) + cases.foreach { p => + val s = p.serializeAsString + val restored = GlobalPortIdentitySerde.deserializeFromString(s) + assert(restored == p, s"round-trip mismatch for $p (serialized: $s)") + } + } + + it should "produce the documented format for default and non-default values" in { + // Pin the exact format. If this changes, callers reading existing + // VFS URIs from disk will break — locking it down forces a deliberate + // migration story. + assert( + globalPort().serializeAsString == + "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true)" + ) + assert( + globalPort( + logical = "op-Z", + layer = "extra-layer", + portIdValue = 7, + internal = true, + input = false + ).serializeAsString == + "(logicalOpId=op-Z,layerName=extra-layer,portId=7,isInternal=true,isInput=false)" + ) + } + + it should "round-trip identifiers containing dashes and dots (regex non-comma matcher)" in { + // The deserialization regex uses `[^,]+` for the field body, so any + // non-comma character is fair game. Cover the realistic counter- + // examples (dashes, dots) since logical op ids and layer names use + // both; if the regex were ever tightened to alphanumerics only, this + // would fail on purpose. + val p = globalPort(logical = "my.op-with-dashes.v2", layer = "main-1") + assert(GlobalPortIdentitySerde.deserializeFromString(p.serializeAsString) == p) + } + + it should "throw IllegalArgumentException when serializing a negative port id" in { + // Port ids are array indices and must be non-negative; the serializer + // rejects negatives so corrupt data can't reach VFS URIs. + intercept[IllegalArgumentException] { + globalPort(portIdValue = -1).serializeAsString + } + } + + it should "throw IllegalArgumentException when deserializing a negative port id" in { + // Symmetric: a hand-crafted string with a negative portId must be + // rejected by the deserializer too (so tampered URIs don't slip + // through). + val malformed = "(logicalOpId=op-A,layerName=main,portId=-1,isInternal=false,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "throw IllegalArgumentException when the input has the wrong field order" in { + // The regex pins the documented field order; a swapped order should + // not silently parse with confused values. + val swapped = "(layerName=main,logicalOpId=op-A,portId=0,isInternal=false,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(swapped) + } + } + + it should "throw IllegalArgumentException when the input has trailing content past the closing paren" in { + val withTrailing = + "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false,isInput=true) extra" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(withTrailing) + } + } + + it should "throw IllegalArgumentException when a field body is empty" in { + // `[^,]+` requires at least one character, so an empty layerName + // (`,layerName=,`) must fail to match. + val emptyLayer = "(logicalOpId=op-A,layerName=,portId=0,isInternal=false,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(emptyLayer) + } + } + + it should "throw IllegalArgumentException on a completely malformed string" in { + val ex = intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString("not even close") + } + assert(ex.getMessage.contains("not even close")) + } + + it should "throw IllegalArgumentException when a required field is missing" in { + // Drop isInput. + val malformed = "(logicalOpId=op-A,layerName=main,portId=0,isInternal=false)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "throw NumberFormatException when portId is non-numeric" in { + // The regex matches (`[^,]+`) but `.toInt` fails. NumberFormatException + // extends IllegalArgumentException; assert the more specific type so a + // regression that swallowed/rewrapped it is visible. + val malformed = "(logicalOpId=op-A,layerName=main,portId=NaN,isInternal=false,isInput=true)" + intercept[NumberFormatException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "throw IllegalArgumentException when a boolean field is non-boolean" in { + // `String.toBoolean` is strict: only \"true\" / \"false\" (case-insensitive) + // pass; anything else throws IllegalArgumentException. + val malformed = "(logicalOpId=op-A,layerName=main,portId=0,isInternal=maybe,isInput=true)" + intercept[IllegalArgumentException] { + GlobalPortIdentitySerde.deserializeFromString(malformed) + } + } + + it should "use no underscore in its own format characters (separators / keys)" in { + // Pin the format-character invariant: the wrapping `(...)`, the field + // separators `,`, the key=value separators, and the field NAMES + // themselves contain no underscore. Verify by building the format with + // empty-string-replacement values for every input field, so anything + // left in the output is purely from `serializeAsString`'s own format. + // (For the layerName field the empty-input variant is rejected by the + // deserializer regex; here we only check the SERIALIZED output, not the + // round-trip.) + val s = globalPort(logical = "x", layer = "x").serializeAsString + val formatChars = s.replace("x", "").replace("0", "").replace("false", "").replace("true", "") + assert(!formatChars.contains("_"), s"format characters must be underscore-free: $formatChars") + } + + it should "throw IllegalArgumentException when logicalOpId contains an underscore" in { + // Enforces the documented VFS-compatibility contract: the serialized + // form must be underscore-free. The serializer rejects underscored + // inputs at the boundary instead of silently emitting a string that + // would interfere with VFS URI parsing downstream. + intercept[IllegalArgumentException] { + globalPort(logical = "__DummyOperator").serializeAsString + } + } + + it should "throw IllegalArgumentException when layerName contains an underscore" in { + // Both fields enforce the same invariant; cover them independently so + // a partial fix that only validates one surfaces as a test failure. + intercept[IllegalArgumentException] { + globalPort(layer = "main_source_0_op").serializeAsString + } + } + + // --------------------------------------------------------------------------- + // PortIdentityKeySerializer.portIdToString (companion, not the Jackson class) + // --------------------------------------------------------------------------- + + "PortIdentityKeySerializer.portIdToString" should "format a PortIdentity as `id_internal`" in { + assert(PortIdentityKeySerializer.portIdToString(PortIdentity(0, internal = false)) == "0_false") + assert(PortIdentityKeySerializer.portIdToString(PortIdentity(7, internal = true)) == "7_true") + } + + // --------------------------------------------------------------------------- + // PortIdentityKeySerializer + PortIdentityKeyDeserializer (Jackson wiring) + // --------------------------------------------------------------------------- + // + // These tests use the production `JSONUtils.objectMapper` directly so a + // regression in the singleton wiring (e.g. the module that registers the + // PortIdentity key (de)serializer being removed or reordered) surfaces + // here, not just on a freshly-constructed mapper. + + "PortIdentity Jackson key (de)serialization" should "round-trip a Map[PortIdentity, String] via JSONUtils.objectMapper" in { + val original = Map( + PortIdentity(0, internal = false) -> "a", + PortIdentity(1, internal = true) -> "b" + ) + val json = objectMapper.writeValueAsString(original) + // Verify the JSON keys match the documented `id_internal` format. + assert(json.contains("\"0_false\"")) + assert(json.contains("\"1_true\"")) + val tref = objectMapper.getTypeFactory + .constructMapType(classOf[java.util.HashMap[_, _]], classOf[PortIdentity], classOf[String]) + val restored: java.util.Map[PortIdentity, String] = objectMapper.readValue(json, tref) + import scala.jdk.CollectionConverters._ + assert(restored.asScala.toMap == original) + } + + it should "round-trip an empty Map[PortIdentity, V] without invoking the (de)serializer" in { + val original = Map.empty[PortIdentity, String] + val json = objectMapper.writeValueAsString(original) + val tref = objectMapper.getTypeFactory + .constructMapType(classOf[java.util.HashMap[_, _]], classOf[PortIdentity], classOf[String]) + val restored: java.util.Map[PortIdentity, String] = objectMapper.readValue(json, tref) + assert(restored.isEmpty) + } + + "PortIdentityKeyDeserializer.deserializeKey" should "throw NumberFormatException for a non-integer id" in { + val d = new PortIdentityKeyDeserializer + intercept[NumberFormatException] { + d.deserializeKey("notAnInt_false", null) + } + } + + it should "throw IllegalArgumentException for a non-boolean internal flag" in { + val d = new PortIdentityKeyDeserializer + intercept[IllegalArgumentException] { + d.deserializeKey("0_notABool", null) + } + } + + it should "throw NumberFormatException when the underscore separator is missing and the whole string is non-numeric" in { + // `key.split("_")` on a separator-less non-numeric string yields a + // single-element array, and `parts(0).toInt` fires first → NFE. + val d = new PortIdentityKeyDeserializer + intercept[NumberFormatException] { + d.deserializeKey("missingSeparator", null) + } + } + + it should "throw ArrayIndexOutOfBoundsException when only the id is provided (no `_internal` suffix)" in { + // Different separator-missing path: `\"5\".split(\"_\")` yields + // [\"5\"], parts(0).toInt = 5 succeeds, then parts(1) reads past the + // end. Pin this failure mode explicitly so a future safer parser + // breaks the spec on purpose (and the safer error type is chosen + // consciously). + val d = new PortIdentityKeyDeserializer + intercept[ArrayIndexOutOfBoundsException] { + d.deserializeKey("5", null) + } + } + + it should "silently accept extra trailing underscore-separated segments (lenient parser, current behavior)" in { + // Pin the current lenient behavior: `parts(0).toInt` and + // `parts(1).toBoolean` ignore everything past `parts(1)`, so a key + // like `"1_true_garbage"` deserializes to `PortIdentity(1, true)` + // without complaint. The strict-rejection variant lives in a + // pendingUntilFixed test below; characterizing today's lenient + // path here means a future-tightening fix would need to update + // both tests deliberately. + val d = new PortIdentityKeyDeserializer + val pid = d.deserializeKey("1_true_garbage", null) + assert(pid == PortIdentity(1, internal = true)) + } + + it should "eventually reject keys with extra trailing segments (pendingUntilFixed)" in pendingUntilFixed { + // Documented contract: a `PortIdentityKeySerializer` output is exactly + // `id_internal` — two underscore-separated segments. Anything else is + // corrupt JSON and should be rejected, not silently truncated. The + // current implementation is lenient (see characterization test + // above); this pendingUntilFixed flips to passing once the parser + // is hardened, then `pendingUntilFixed` inverts that into a + // deliberate failure forcing the marker to be removed. + val d = new PortIdentityKeyDeserializer + intercept[IllegalArgumentException] { + d.deserializeKey("1_true_garbage", null) + } + } +}