Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion orchestrator/core/discoveryspace/space.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright IBM Corporation 2025, 2026
# SPDX-License-Identifier: MIT

import contextlib
import logging
import os
import typing
from collections.abc import Callable
from collections.abc import Callable, Iterator
from functools import wraps
from typing import Any

Expand All @@ -21,6 +22,7 @@
DiscoverySpaceConfiguration,
DiscoverySpaceProperties,
)
from orchestrator.core.operation.config import DiscoveryOperationEnum
from orchestrator.core.operation.resource import OperationResource
from orchestrator.core.resources import CoreResourceKinds
from orchestrator.metastore.project import ProjectContext
Expand Down Expand Up @@ -48,6 +50,9 @@

moduleLogger = logging.getLogger("discoveryspace")

SCRIPT_OPERATION_EXECUTION_LABEL = "script"
SCRIPT_OPERATION_LABEL_KEY = "execution"


def _perform_preflight_checks_for_sample_store_methods(
f: Callable[..., Any], # noqa: ANN401
Expand Down Expand Up @@ -885,6 +890,109 @@ def updateOperation(
self.log.info(f"Updating run {operationResource.identifier}")
return self._metadataStore.updateResource(operationResource)

@contextlib.contextmanager
def operation_context(
self,
name: str,
description: str | None = None,
metadata: dict | None = None,
operation_type: DiscoveryOperationEnum = DiscoveryOperationEnum.SEARCH,
) -> Iterator[str]:
"""Context manager that registers a script operation and manages its lifecycle.

Creates an OperationResource linked to this space, appends STARTED before
yielding the operation_id, and writes FINISHED/SUCCESS or FINISHED/FAIL on exit.
The operation_id should be passed as ``requesterid`` to Actuators execute
or submit methods

Args:
name: Human-readable script name stored in the operation configuration.
description: Optional description for the operation metadata.
metadata: Optional extra metadata fields merged into ConfigurationMetadata.
Comment thread
michael-johnston marked this conversation as resolved.
operation_type: Semantic type for the operation (e.g. SEARCH for explore scripts).
Script provenance is always recorded on metadata labels under
``execution: script``.

Yields:
The operation resource identifier.

Raises:
RuntimeError: If the discovery space has no metadata store.
"""
if self._metadataStore is None:
raise RuntimeError(
"DiscoverySpace.operation_context requires a metadata store; "
"load the space from stored configuration first."
)

from orchestrator.core.metadata import ConfigurationMetadata
from orchestrator.core.operation.config import (
DiscoveryOperationConfiguration,
DiscoveryOperationResourceConfiguration,
ScriptOperatorConf,
)
from orchestrator.core.operation.resource import (
OperationExitStateEnum,
OperationResource,
OperationResourceEventEnum,
OperationResourceStatus,
)

script_module = ScriptOperatorConf(name=name, operationType=operation_type)
extra_metadata = dict(metadata or {})
user_labels = extra_metadata.pop("labels", None) or {}
config_metadata = ConfigurationMetadata(
name=name,
description=description,
labels={
SCRIPT_OPERATION_LABEL_KEY: SCRIPT_OPERATION_EXECUTION_LABEL,
**user_labels,
},
)
for key, value in extra_metadata.items():
setattr(config_metadata, key, value)

operation_payload = DiscoveryOperationResourceConfiguration(
operation=DiscoveryOperationConfiguration(
module=script_module,
parameters={},
),
metadata=config_metadata,
spaces=[self.uri],
)

operation = OperationResource(
operationType=script_module.operationType,
operatorIdentifier=script_module.operatorIdentifier,
config=operation_payload,
)

self.addOperation(operation)
self._verified_operation_ids.add(operation.identifier)

try:
operation.status.append(
OperationResourceStatus(event=OperationResourceEventEnum.STARTED)
)
self.updateOperation(operation)
yield operation.identifier
operation.status.append(
OperationResourceStatus(
event=OperationResourceEventEnum.FINISHED,
exit_state=OperationExitStateEnum.SUCCESS,
)
)
except Exception:
operation.status.append(
OperationResourceStatus(
event=OperationResourceEventEnum.FINISHED,
exit_state=OperationExitStateEnum.FAIL,
)
)
raise
finally:
self.updateOperation(operation)

@_perform_preflight_checks_for_sample_store_methods
def complete_measurement_request_with_results_timeseries(
self,
Expand Down
34 changes: 30 additions & 4 deletions orchestrator/core/operation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DiscoveryOperationEnum(enum.Enum):
LEARN = "learn"
QUERY = "query"
EXPORT = "export"
SCRIPT = "script"


def get_actuator_configurations(
Expand Down Expand Up @@ -351,6 +352,28 @@ def operatorIdentifier(self) -> str:
return operator.operatorIdentifier if operator else f"{self.operatorName}-None"


class ScriptOperatorConf(pydantic.BaseModel):
"""Identifies an inline script or custom operator not registered in any collection."""

model_config = ConfigDict(extra="forbid")
name: Annotated[str, pydantic.Field(description="Human-readable script name")]
version: Annotated[str, pydantic.Field()] = "0.1.0"
operationType: Annotated[
DiscoveryOperationEnum,
pydantic.Field(
description=(
"Semantic operation type (e.g. search, characterize). "
"Script provenance is recorded separately via operation metadata labels."
),
),
] = DiscoveryOperationEnum.SEARCH

@property
def operatorIdentifier(self) -> str:
"""Return the canonical script operator identifier."""
return f"script-{self.name}-{self.version}"


# ---------------------------------------------------------------------------
# Backwards-compatibility alias — use OperatorReference in new code
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -396,7 +419,7 @@ class DiscoveryOperationConfiguration(pydantic.BaseModel):
model_config = ConfigDict(extra="forbid")

module: Annotated[
OperatorModuleConf | OperatorReference,
OperatorModuleConf | OperatorReference | ScriptOperatorConf,
pydantic.Field(
description="The module or function providing the discovery operation"
),
Expand All @@ -412,8 +435,9 @@ class DiscoveryOperationConfiguration(pydantic.BaseModel):
@pydantic.field_validator("module", mode="after")
@classmethod
def ensure_module_is_installed(
cls, module: OperatorModuleConf | OperatorReference
) -> OperatorModuleConf | OperatorReference:
cls,
module: OperatorModuleConf | OperatorReference | ScriptOperatorConf,
) -> OperatorModuleConf | OperatorReference | ScriptOperatorConf:
"""Validates that the operator module is installed and accessible.

Args:
Expand All @@ -425,7 +449,7 @@ def ensure_module_is_installed(
Raises:
ValueError: If the operator module is not installed or cannot be imported.
"""
if isinstance(module, OperatorReference):
if isinstance(module, OperatorReference | ScriptOperatorConf):
return module

import importlib
Expand Down Expand Up @@ -462,6 +486,8 @@ def validate_and_downcast_parameters(self) -> Self:
self.parameters = operator_metadata.configuration_model.model_validate(
self.parameters
)
elif isinstance(self.module, ScriptOperatorConf):
self.parameters = {}
else:
from orchestrator.modules.operators.collections import (
operationCollectionMap,
Expand Down
52 changes: 52 additions & 0 deletions tests/core/test_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
DiscoveryOperationConfiguration,
DiscoveryOperationEnum,
DiscoveryOperationResourceConfiguration,
ScriptOperatorConf,
)
from orchestrator.core.operation.resource import (
OperationExitStateEnum,
Expand Down Expand Up @@ -186,3 +187,54 @@ def test_add_operation_result(
) -> None:

pass


def test_script_operator_conf_round_trip() -> None:
"""ScriptOperatorConf serialises and validates through operation configuration."""
script_module = ScriptOperatorConf(
name="grid-sweep",
version="1.0.0",
operationType=DiscoveryOperationEnum.SEARCH,
)
assert script_module.operationType == DiscoveryOperationEnum.SEARCH
assert script_module.operatorIdentifier == "script-grid-sweep-1.0.0"

operation_configuration = DiscoveryOperationConfiguration(
module=script_module,
parameters={"ignored": "value"},
)
assert operation_configuration.parameters == {}

resource_configuration = DiscoveryOperationResourceConfiguration(
operation=operation_configuration,
spaces=["space-test123"],
)

dumped = resource_configuration.model_dump()
restored = DiscoveryOperationResourceConfiguration.model_validate(dumped)
assert isinstance(restored.operation.module, ScriptOperatorConf)
assert restored.operation.module.name == "grid-sweep"
assert restored.operation.module.version == "1.0.0"
assert restored.operation.module.operationType == DiscoveryOperationEnum.SEARCH
assert restored.operation.parameters == {}


def test_script_operation_resource_identifier() -> None:
"""OperationResource built from ScriptOperatorConf uses script operator id."""
script_module = ScriptOperatorConf(
name="inline-script",
operationType=DiscoveryOperationEnum.CHARACTERIZE,
)
operation_configuration = DiscoveryOperationResourceConfiguration(
operation=DiscoveryOperationConfiguration(module=script_module),
spaces=["space-test123"],
)
operation = OperationResource(
operationType=script_module.operationType,
operatorIdentifier=script_module.operatorIdentifier,
config=operation_configuration,
)

assert operation.operationType == DiscoveryOperationEnum.CHARACTERIZE
assert operation.operatorIdentifier == "script-inline-script-0.1.0"
assert operation.identifier.startswith("operation-script-inline-script-0.1.0-")
91 changes: 91 additions & 0 deletions tests/core/test_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
DiscoverySpace,
SpaceInconsistencyError,
)
from orchestrator.core.operation.resource import (
OperationResource,
OperationResourceEventEnum,
OperationResourceStatus,
)
from orchestrator.core.resources import CoreResourceKinds
from orchestrator.core.samplestore.base import ActiveSampleStore
from orchestrator.core.samplestore.config import (
SampleStoreConfiguration,
Expand Down Expand Up @@ -319,3 +325,88 @@ def test_matching_entities_table_virtual_property_with_multiple_values(
assert virtual_id in df_with_vp.columns
# Aggregated values should be scalar (not lists or None)
assert df_with_vp[virtual_id].dropna().apply(lambda x: np.isscalar(x)).all()


def _operation_lifecycle_statuses(
operation: OperationResource,
) -> list[OperationResourceStatus]:
return [
status
for status in operation.status
if status.event in OperationResourceEventEnum
]


def test_operation_context_success_lifecycle(pfas_space: DiscoverySpace) -> None:
"""operation_context registers the operation and records STARTED then FINISHED/SUCCESS."""
from orchestrator.core.discoveryspace.space import (
SCRIPT_OPERATION_EXECUTION_LABEL,
SCRIPT_OPERATION_LABEL_KEY,
)
from orchestrator.core.operation.config import (
DiscoveryOperationEnum,
ScriptOperatorConf,
)
from orchestrator.core.operation.resource import (
OperationExitStateEnum,
OperationResource,
OperationResourceEventEnum,
)

with pfas_space.operation_context(
name="test-script",
description="Script operation for testing",
metadata={"labels": {"source": "test"}},
) as operation_id:
assert operation_id.startswith("operation-script-test-script-")
assert operation_id in pfas_space._verified_operation_ids

operation = pfas_space.metadataStore.getResource(
identifier=operation_id,
kind=CoreResourceKinds.OPERATION,
)
assert isinstance(operation, OperationResource)
assert operation_id in pfas_space.operations["IDENTIFIER"].values
assert isinstance(operation.config.operation.module, ScriptOperatorConf)
assert (
operation.config.operation.module.operationType == DiscoveryOperationEnum.SEARCH
)
assert operation.operationType == DiscoveryOperationEnum.SEARCH
assert operation.config.metadata.description == "Script operation for testing"
assert operation.config.metadata.labels == {
SCRIPT_OPERATION_LABEL_KEY: SCRIPT_OPERATION_EXECUTION_LABEL,
"source": "test",
}

lifecycle_statuses = _operation_lifecycle_statuses(operation)
assert lifecycle_statuses[0].event == OperationResourceEventEnum.STARTED
assert lifecycle_statuses[-1].event == OperationResourceEventEnum.FINISHED
assert lifecycle_statuses[-1].exit_state == OperationExitStateEnum.SUCCESS


def test_operation_context_failure_lifecycle(pfas_space: DiscoverySpace) -> None:
"""operation_context records FINISHED/FAIL when the wrapped block raises."""
from orchestrator.core.operation.resource import (
OperationExitStateEnum,
OperationResource,
OperationResourceEventEnum,
)

failure_message = "Simulated script failure during operation_context lifecycle"

with (
pytest.raises(RuntimeError, match=re.escape(failure_message)),
pfas_space.operation_context(name="failing-script") as operation_id,
):
raise RuntimeError(failure_message)

operation = pfas_space.metadataStore.getResource(
identifier=operation_id,
kind=CoreResourceKinds.OPERATION,
)
assert isinstance(operation, OperationResource)

lifecycle_statuses = _operation_lifecycle_statuses(operation)
assert lifecycle_statuses[0].event == OperationResourceEventEnum.STARTED
assert lifecycle_statuses[-1].event == OperationResourceEventEnum.FINISHED
assert lifecycle_statuses[-1].exit_state == OperationExitStateEnum.FAIL