diff --git a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingFlowControllerExtension.java b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingFlowControllerExtension.java index 61fceacc411..5eeee7c9e58 100644 --- a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingFlowControllerExtension.java +++ b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/DataPlaneSignalingFlowControllerExtension.java @@ -14,6 +14,7 @@ package org.eclipse.edc.signaling; +import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataAddressStore; import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; @@ -48,6 +49,8 @@ public class DataPlaneSignalingFlowControllerExtension implements ServiceExtensi private DataPlaneSelectorService dataPlaneSelectorService; @Inject private DataAddressStore dataAddressStore; + @Inject + private AssetIndex assetIndex; @Override public String name() { @@ -61,6 +64,6 @@ public DataFlowController dataFlowController() { typeTransformerRegistry.register(new DataFlowStatusMessageToDataFlowResponseTransformer()); typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer()); return new DataPlaneSignalingFlowController(apiConfiguration.createPublicUri(), dataPlaneSelectorService, - typeTransformerRegistry, clientFactory, dataAddressStore); + typeTransformerRegistry, clientFactory, dataAddressStore, assetIndex); } } diff --git a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java index c21d4b101de..2ea2310f8ea 100644 --- a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java +++ b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowController.java @@ -16,6 +16,7 @@ import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; import org.eclipse.edc.connector.controlplane.asset.spi.domain.DataplaneMetadata; +import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex; import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataAddressStore; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; @@ -47,6 +48,7 @@ import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toSet; +import static org.eclipse.edc.participantcontext.spi.types.ParticipantResource.queryByParticipantContextId; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; /** @@ -59,15 +61,17 @@ public class DataPlaneSignalingFlowController implements DataFlowController { private final TypeTransformerRegistry typeTransformerRegistry; private final ClientFactory clientFactory; private final DataAddressStore dataAddressStore; + private final AssetIndex assetIndex; public DataPlaneSignalingFlowController(URI callbackUri, DataPlaneSelectorService selectorClient, TypeTransformerRegistry typeTransformerRegistry, ClientFactory clientFactory, - DataAddressStore dataAddressStore) { + DataAddressStore dataAddressStore, AssetIndex assetIndex) { this.callbackUri = callbackUri; this.selectorClient = selectorClient; this.typeTransformerRegistry = typeTransformerRegistry; this.clientFactory = clientFactory; this.dataAddressStore = dataAddressStore; + this.assetIndex = assetIndex; } @Override @@ -264,16 +268,19 @@ public Set transferTypesFor(Asset asset) { var profiles = Optional.ofNullable(asset.getDataplaneMetadata()) .map(DataplaneMetadata::getProfiles) .orElse(List.of()); - return transferTypes(profiles); + return transferTypes(asset.getParticipantContextId(), profiles); } @Override public Set transferTypesFor(String assetId) { - return transferTypes(List.of()); + return Optional.ofNullable(assetIndex.findById(assetId)) + .map(this::transferTypesFor) + .orElseGet(Collections::emptySet); } - private @NotNull Set transferTypes(List profiles) { - return selectorClient.getAll().map(Collection::stream) + private @NotNull Set transferTypes(String participantContextId, List profiles) { + return selectorClient.search(queryByParticipantContextId(participantContextId).build()) + .map(Collection::stream) .map(dataPlane -> dataPlane.map(DataPlaneInstance::getAllowedTransferTypes) .filter(allowedTransferTypes -> profiles.isEmpty() || !Collections.disjoint(allowedTransferTypes, profiles)) .flatMap(Collection::stream).collect(toSet())) diff --git a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/DataPlaneRegistrationApiV5.java b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/v5/DataPlaneRegistrationApiV5.java similarity index 96% rename from data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/DataPlaneRegistrationApiV5.java rename to data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/v5/DataPlaneRegistrationApiV5.java index 3646351c453..e2ac97480fb 100644 --- a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/DataPlaneRegistrationApiV5.java +++ b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/v5/DataPlaneRegistrationApiV5.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2026 Think-it GmbH + * Copyright (c) 2026 Metaform Systems, Inc. * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -8,7 +8,7 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: - * Think-it GmbH - initial API and implementation + * Metaform Systems, Inc. - initial API and implementation * */ diff --git a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/DataPlaneRegistrationApiV5Controller.java b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/v5/DataPlaneRegistrationApiV5Controller.java similarity index 94% rename from data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/DataPlaneRegistrationApiV5Controller.java rename to data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/v5/DataPlaneRegistrationApiV5Controller.java index c478de43849..7ad8d168e9d 100644 --- a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/DataPlaneRegistrationApiV5Controller.java +++ b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/main/java/org/eclipse/edc/signaling/port/api/management/v5/DataPlaneRegistrationApiV5Controller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2026 Think-it GmbH + * Copyright (c) 2026 Metaform Systems, Inc. * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -8,7 +8,7 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: - * Think-it GmbH - initial API and implementation + * Metaform Systems, Inc. - initial API and implementation * */ @@ -41,7 +41,7 @@ @Produces(APPLICATION_JSON) @Consumes(APPLICATION_JSON) @Path("/v5beta/participants/{participantContextId}/dataplanes") -public class DataPlaneRegistrationApiV5Controller implements org.eclipse.edc.signaling.port.api.management.v5.DataPlaneRegistrationApiV5 { +public class DataPlaneRegistrationApiV5Controller implements DataPlaneRegistrationApiV5 { private final DataPlaneSelectorService dataPlaneSelectorService; private final AuthorizationService authorizationService; diff --git a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java index aac3e10e538..e9473e3bb5b 100644 --- a/data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java +++ b/data-protocols/data-plane-signaling/data-plane-signaling-core/src/test/java/org/eclipse/edc/signaling/logic/DataPlaneSignalingFlowControllerTest.java @@ -16,6 +16,7 @@ import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; import org.eclipse.edc.connector.controlplane.asset.spi.domain.DataplaneMetadata; +import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataAddressStore; import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess; @@ -67,10 +68,11 @@ public class DataPlaneSignalingFlowControllerTest { private final DataPlaneSelectorService selectorService = mock(); private final TypeTransformerRegistry typeTransformerRegistry = mock(); private final DataAddressStore dataAddressStore = mock(); + private final AssetIndex assetIndex = mock(); private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController( URI.create("http://localhost"), selectorService, - typeTransformerRegistry, clientFactory, dataAddressStore); + typeTransformerRegistry, clientFactory, dataAddressStore, assetIndex); @Nested class Prepare { @@ -488,7 +490,7 @@ class TransferTypes { void transferTypes_shouldReturnTypesForSpecifiedAsset() { var assetNoResponse = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); - when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of( + when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of( dataPlaneInstanceBuilder().allowedTransferType("Custom-PUSH").build(), dataPlaneInstanceBuilder().allowedTransferType("Custom-PULL").build() ))); @@ -500,7 +502,7 @@ void transferTypes_shouldReturnTypesForSpecifiedAsset() { @Test void shouldReturnEmptyList_whenCannotGetDataplaneInstances() { - when(selectorService.getAll()).thenReturn(ServiceResult.unexpected("error")); + when(selectorService.search(any())).thenReturn(ServiceResult.unexpected("error")); var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); var transferTypes = flowController.transferTypesFor(asset); @@ -510,7 +512,7 @@ void shouldReturnEmptyList_whenCannotGetDataplaneInstances() { @Test void shouldFilterByAssetProfile_whenProfilesAreSet() { - when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of( + when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of( dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build(), dataPlaneInstanceBuilder().allowedTransferType("S3-PULL").build() ))); @@ -527,7 +529,7 @@ void shouldFilterByAssetProfile_whenProfilesAreSet() { @Test void shouldReturnAllTypes_whenAssetHasNoProfiles() { - when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of( + when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of( dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build(), dataPlaneInstanceBuilder().allowedTransferType("S3-PULL").build() ))); @@ -539,6 +541,54 @@ void shouldReturnAllTypes_whenAssetHasNoProfiles() { assertThat(transferTypes).containsExactlyInAnyOrder("Http-PULL", "S3-PULL"); } + + @Test + void shouldFilterDataPlanesByParticipantContext() { + when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of( + dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build() + ))); + var asset = Asset.Builder.newInstance() + .participantContextId("participant-1") + .dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()) + .build(); + + flowController.transferTypesFor(asset); + + verify(selectorService).search(argThat(querySpec -> querySpec.getFilterExpression().stream().anyMatch(criterion -> + "participantContextId".equals(criterion.getOperandLeft()) && + "=".equals(criterion.getOperator()) && + "participant-1".equals(criterion.getOperandRight())))); + } + + @Test + void shouldResolveAssetById_whenLookingUpByAssetId() { + var asset = Asset.Builder.newInstance() + .id("assetId") + .participantContextId("participant-1") + .dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()) + .build(); + when(assetIndex.findById("assetId")).thenReturn(asset); + when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of( + dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build() + ))); + + var transferTypes = flowController.transferTypesFor("assetId"); + + assertThat(transferTypes).containsExactly("Http-PULL"); + verify(selectorService).search(argThat(querySpec -> querySpec.getFilterExpression().stream().anyMatch(criterion -> + "participantContextId".equals(criterion.getOperandLeft()) && + "participant-1".equals(criterion.getOperandRight())))); + } + + @Test + void shouldReturnEmpty_whenAssetIdNotFound() { + when(assetIndex.findById("missing")).thenReturn(null); + + var transferTypes = flowController.transferTypesFor("missing"); + + assertThat(transferTypes).isEmpty(); + verifyNoInteractions(selectorService); + } } @NotNull diff --git a/extensions/common/api/management-api-configuration/src/main/resources/management-api-version.json b/extensions/common/api/management-api-configuration/src/main/resources/management-api-version.json index 766ec766153..7cde90ff09f 100644 --- a/extensions/common/api/management-api-configuration/src/main/resources/management-api-version.json +++ b/extensions/common/api/management-api-configuration/src/main/resources/management-api-version.json @@ -14,7 +14,7 @@ { "version": "5.0.0-beta", "urlPath": "/v5beta", - "lastUpdated": "2026-05-27T09:00:00Z", + "lastUpdated": "2026-05-28T09:00:00Z", "maturity": "beta" } ] diff --git a/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/v5/CatalogApiV5EndToEndTest.java b/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/v5/CatalogApiV5EndToEndTest.java index 6d99034600a..16ce43ec9bd 100644 --- a/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/v5/CatalogApiV5EndToEndTest.java +++ b/system-tests/management-api/management-api-test-runner/src/test/java/org/eclipse/edc/test/e2e/managementapi/v5/CatalogApiV5EndToEndTest.java @@ -226,7 +226,7 @@ void requestCatalog_shouldReturnCatalog_withQuerySpec(ManagementEndToEndV5TestCo var asset2 = createAsset("id-2", "test-type") .property(DCT_CONFORMS_TO_ATTRIBUTE, Map.of(ID, "https://example.org/schema")).build(); - + assetIndex.create(asset2); createContractOffer(policyDefinitionStore, contractDefinitionStore, List.of()); @@ -368,7 +368,9 @@ void getDataset_shouldReturnDataset(ManagementEndToEndV5TestContext context, Ass ContractDefinitionStore contractDefinitionStore, DataPlaneInstanceStore dataPlaneInstanceStore) { var dataPlaneInstance = DataPlaneInstance.Builder.newInstance().url("http://localhost/any") - .allowedSourceType("test-type").allowedTransferType("any-PULL").build(); + .allowedSourceType("test-type").allowedTransferType("any-PULL") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); dataPlaneInstanceStore.save(dataPlaneInstance); createContractOffer(policyDefinitionStore, contractDefinitionStore, List.of()); @@ -430,7 +432,9 @@ void getDatasetWithResponseChannel_shouldReturnDataset(ManagementEndToEndV5TestC var dataPlaneInstance = DataPlaneInstance.Builder.newInstance().url("http://localhost/any") .allowedDestType("any").allowedSourceType("test-type") - .allowedTransferType("any-PULL").allowedTransferType("any-PULL-response").build(); + .allowedTransferType("any-PULL").allowedTransferType("any-PULL-response") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); dataPlaneInstanceStore.save(dataPlaneInstance); var responseChannel = DataAddress.Builder.newInstance() @@ -467,13 +471,17 @@ void getDatasetWithResponseChannel_shouldReturnDataset(ManagementEndToEndV5TestC @Test void getDataset_shouldFilterDistributionsByAssetProfiles(ManagementEndToEndV5TestContext context, AssetIndex assetIndex, - DataPlaneInstanceStore dataPlaneInstanceStore, - PolicyDefinitionStore policyDefinitionStore, - ContractDefinitionStore contractDefinitionStore) { + DataPlaneInstanceStore dataPlaneInstanceStore, + PolicyDefinitionStore policyDefinitionStore, + ContractDefinitionStore contractDefinitionStore) { var httpDataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any") - .allowedSourceType("test-type").allowedTransferType("Http-PULL").build(); + .allowedSourceType("test-type").allowedTransferType("Http-PULL") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); var s3DataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any-s3") - .allowedSourceType("test-type").allowedTransferType("S3-PULL").build(); + .allowedSourceType("test-type").allowedTransferType("S3-PULL") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); dataPlaneInstanceStore.save(httpDataPlane); dataPlaneInstanceStore.save(s3DataPlane); @@ -506,13 +514,17 @@ void getDataset_shouldFilterDistributionsByAssetProfiles(ManagementEndToEndV5Tes @Test void getDataset_shouldReturnAllDistributions_whenAssetHasNoProfiles(ManagementEndToEndV5TestContext context, AssetIndex assetIndex, - DataPlaneInstanceStore dataPlaneInstanceStore, - PolicyDefinitionStore policyDefinitionStore, - ContractDefinitionStore contractDefinitionStore) { + DataPlaneInstanceStore dataPlaneInstanceStore, + PolicyDefinitionStore policyDefinitionStore, + ContractDefinitionStore contractDefinitionStore) { var httpDataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any") - .allowedSourceType("test-type").allowedTransferType("Http-PULL").build(); + .allowedSourceType("test-type").allowedTransferType("Http-PULL") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); var s3DataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any-s3") - .allowedSourceType("test-type").allowedTransferType("S3-PULL").build(); + .allowedSourceType("test-type").allowedTransferType("S3-PULL") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); dataPlaneInstanceStore.save(httpDataPlane); dataPlaneInstanceStore.save(s3DataPlane); @@ -575,7 +587,9 @@ void getDataset_tokenBearerIsAdmin(ManagementEndToEndV5TestContext context, Data PolicyDefinitionStore policyDefinitionStore, ContractDefinitionStore contractDefinitionStore, AssetIndex assetIndex, OauthServer authServer) { var dataPlaneInstance = DataPlaneInstance.Builder.newInstance().url("http://localhost/any") - .allowedSourceType("test-type").allowedTransferType("any-PULL").build(); + .allowedSourceType("test-type").allowedTransferType("any-PULL") + .participantContextId(PARTICIPANT_CONTEXT_ID) + .build(); dataPlaneInstanceStore.save(dataPlaneInstance); createContractOffer(policyDefinitionStore, contractDefinitionStore, List.of()); diff --git a/system-tests/tck/dps-tck-connector-under-test/src/main/java/org/eclipse/edc/tck/dps/signaling/DpsTckExtension.java b/system-tests/tck/dps-tck-connector-under-test/src/main/java/org/eclipse/edc/tck/dps/signaling/DpsTckExtension.java index 2904fa7b8ad..2de66f6efc5 100644 --- a/system-tests/tck/dps-tck-connector-under-test/src/main/java/org/eclipse/edc/tck/dps/signaling/DpsTckExtension.java +++ b/system-tests/tck/dps-tck-connector-under-test/src/main/java/org/eclipse/edc/tck/dps/signaling/DpsTckExtension.java @@ -53,15 +53,12 @@ public class DpsTckExtension implements ServiceExtension { public static final String NAME = "DPS TCK Signaling Trigger"; private static final String CONTEXT = "tck"; - - @Configuration - private TckWebhookApiConfiguration webhookApiConfiguration; @Setting(description = "Configures the participant context id for the tck suite runtime", key = "edc.participant.context.id", defaultValue = "participantContextId") public String participantContextId; - @Setting(description = "Base URL of the TCK acting as data plane, equal to dataspacetck.callback.address", key = "edc.tck.dataplane.url") public String tckDataPlaneUrl; - + @Configuration + private TckWebhookApiConfiguration webhookApiConfiguration; @Inject private PortMappingRegistry portMappingRegistry; @Inject @@ -91,14 +88,16 @@ public void initialize(ServiceExtensionContext context) { transferProcessService, participantContextSupplier, monitor)); dataPlaneSelectorService.register(DataPlaneInstance.Builder.newInstance() - .id(UUID.randomUUID().toString()) - .url(tckDataPlaneUrl + "/dataflows") - .allowedTransferType(Set.of("HttpData-PULL", "HttpData-PUSH")) - .participantContextId(participantContextId) - .build()) + .id(UUID.randomUUID().toString()) + .url(tckDataPlaneUrl + "/dataflows") + .allowedTransferType(Set.of("HttpData-PULL", "HttpData-PUSH")) + .participantContextId(participantContextId) + .build()) .orElseThrow(f -> new EdcException("Failed to register TCK data plane: " + f.getFailureDetail())); - var asset = Asset.Builder.newInstance().build(); + var asset = Asset.Builder.newInstance() + .participantContextId(participantContextId) + .build(); assetService.create(asset); contractNegotiationStore.save(ContractNegotiation.Builder.newInstance()