Skip to content
Merged
17 changes: 17 additions & 0 deletions amber/src/main/python/core/util/virtual_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,27 @@ def serialize_global_port_identity(obj: GlobalPortIdentity) -> str:
Expected format:
``(logicalOpId=<logicalOpId>,layerName=<layerName>,
portId=<portId.id>,isInternal=<portId.internal>,isInput=<input>)``

Raises ValueError if `logicalOpId` or `layerName` contains an underscore
(VFS URI parsing relies on the absence of '_'), or if `portId` is negative.
"""
logical_op_id = obj.op_id.logical_op_id.id
layer_name = obj.op_id.layer_name
port_id = obj.port_id.id
is_internal = obj.port_id.internal
is_input_port = obj.input
if "_" in logical_op_id:
raise ValueError(
f"logicalOpId must not contain '_' "
f"(VFS URI parsing relies on this): {logical_op_id}"
)
if "_" in layer_name:
raise ValueError(
f"layerName must not contain '_' "
f"(VFS URI parsing relies on this): {layer_name}"
)
if port_id < 0:
raise ValueError(f"portId must be non-negative: {port_id}")
return (
f"(logicalOpId={logical_op_id},layerName={layer_name},portId={port_id},"
f"isInternal={str(is_internal).lower()},isInput={str(is_input_port).lower()})"
Expand All @@ -72,6 +87,8 @@ def deserialize_global_port_identity(encoded_str: str) -> GlobalPortIdentity:
match.groups()
)
port_id = int(port_id_str)
if port_id < 0:
raise ValueError(f"portId must be non-negative: {port_id}")
is_internal = is_internal_str.lower() == "true"
is_input_port = is_input_str.lower() == "true"
op_id = PhysicalOpIdentity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def iceberg_document(self, amber_schema):
ExecutionIdentity(id=0),
GlobalPortIdentity(
op_id=PhysicalOpIdentity(
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
logical_op_id=OperatorIdentity(id=f"test-table-{operator_uuid}"),
layer_name="main",
),
port_id=PortIdentity(id=0),
Expand Down
23 changes: 23 additions & 0 deletions amber/src/test/python/core/util/test_virtual_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ def test_round_trips_through_deserialize(self):
assert recovered.port_id.internal is True
assert recovered.input is False

def test_rejects_underscore_in_logical_op_id(self):
# VFS-compatibility contract: serialized output must be
# underscore-free. Fail fast at the boundary on underscored input.
with pytest.raises(ValueError, match="logicalOpId must not contain"):
serialize_global_port_identity(_gpi(op_id="__DummyOperator"))

def test_rejects_underscore_in_layer_name(self):
with pytest.raises(ValueError, match="layerName must not contain"):
serialize_global_port_identity(_gpi(layer="main_source_0_op"))

def test_rejects_negative_port_id(self):
# Port ids are array indices and must be non-negative.
with pytest.raises(ValueError, match="portId must be non-negative"):
serialize_global_port_identity(_gpi(port=-1))


class TestDeserializeGlobalPortIdentity:
def test_parses_canonical_encoded_string(self):
Expand Down Expand Up @@ -137,6 +152,14 @@ def test_raises_value_error_on_missing_field(self):
"(logicalOpId=op,layerName=l,portId=0,isInternal=true)"
)

def test_raises_value_error_on_negative_port_id(self):
# Symmetric with the serializer: tampered URIs with a negative
# portId must be rejected on the way back in.
with pytest.raises(ValueError, match="portId must be non-negative"):
deserialize_global_port_identity(
"(logicalOpId=op,layerName=l,portId=-1,isInternal=false,isInput=true)"
)


class TestGetFromActorIdForInputPortStorage:
def test_prefixes_materialization_reader_to_uri_plus_actor_name(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}

/**
* Serialize and deserializes a GlobalPortIdentity object to a string using a custom, human-readable format
* to ensure it works with both URI and file path and does not incldue underscore "_" so that it does not
* interfere with our own VFS URI parsing.
* to ensure it works with both URI and file path and does not include underscore "_" so that it does not
* interfere with our own VFS URI parsing. Underscores in `logicalOpId` / `layerName` and negative `portId`
* values are rejected with `IllegalArgumentException`.
*/
object GlobalPortIdentitySerde {
implicit class SerdeOps(globalPortId: GlobalPortIdentity) {

/**
* Serializes a GlobalPortIdentity object into a string using our custom, human-readable format
* that works with both URI and file path and does not incldue underscore "_" so that it does not
* that works with both URI and file path and does not include underscore "_" so that it does not
* interfere with our own VFS URI parsing.
*
* @throws java.lang.IllegalArgumentException if `logicalOpId` or `layerName` contains an underscore,
* or if `portId.id` is negative.
* @return A serialized string representation of globalPortId
*/
def serializeAsString: String = {
Expand All @@ -42,6 +45,15 @@ object GlobalPortIdentitySerde {
val portId = globalPortId.portId.id
val isInternal = globalPortId.portId.internal
val isInput = globalPortId.input
require(
!logicalOpId.contains('_'),
s"logicalOpId must not contain '_' (VFS URI parsing relies on this): $logicalOpId"
)
require(
!layerName.contains('_'),
s"layerName must not contain '_' (VFS URI parsing relies on this): $layerName"
)
require(portId >= 0, s"portId must be non-negative: $portId")
s"(logicalOpId=$logicalOpId,layerName=$layerName,portId=$portId,isInternal=$isInternal,isInput=$isInput)"
}
}
Expand All @@ -58,6 +70,7 @@ object GlobalPortIdentitySerde {
serializedGlobalPortId match {
case pattern(logicalOpId, layerName, portIdStr, isInternalStr, isInputStr) =>
val portIdInt = portIdStr.toInt
require(portIdInt >= 0, s"portId must be non-negative: $portIdInt")
val isInternal = isInternalStr.toBoolean
val isInput = isInputStr.toBoolean
val physicalOpId = PhysicalOpIdentity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter
GlobalPortIdentity(
PhysicalOpIdentity(
logicalOpId =
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
layerName = "main"
),
PortIdentity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class IcebergTableStatsSpec extends AnyFlatSpec with BeforeAndAfterAll with Suit
GlobalPortIdentity(
PhysicalOpIdentity(
logicalOpId =
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
OperatorIdentity(s"test-table-${UUID.randomUUID().toString.replace("-", "")}"),
layerName = "main"
),
PortIdentity()
Expand Down
Loading
Loading