Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.edc.signaling;

import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataAddressStore;
import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class DataPlaneSignalingFlowControllerExtension implements ServiceExtensi
private DataPlaneSelectorService dataPlaneSelectorService;
@Inject
private DataAddressStore dataAddressStore;
@Inject
private AssetIndex assetIndex;

@Override
public String name() {
Expand All @@ -61,6 +64,6 @@ public DataFlowController dataFlowController() {
typeTransformerRegistry.register(new DataFlowStatusMessageToDataFlowResponseTransformer());
typeTransformerRegistry.register(new DspDataAddressToDataAddressTransformer());
return new DataPlaneSignalingFlowController(apiConfiguration.createPublicUri(), dataPlaneSelectorService,
typeTransformerRegistry, clientFactory, dataAddressStore);
typeTransformerRegistry, clientFactory, dataAddressStore, assetIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.DataplaneMetadata;
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataAddressStore;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
Expand Down Expand Up @@ -47,6 +48,7 @@

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.participantcontext.spi.types.ParticipantResource.queryByParticipantContextId;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

/**
Expand All @@ -59,15 +61,17 @@ public class DataPlaneSignalingFlowController implements DataFlowController {
private final TypeTransformerRegistry typeTransformerRegistry;
private final ClientFactory clientFactory;
private final DataAddressStore dataAddressStore;
private final AssetIndex assetIndex;

public DataPlaneSignalingFlowController(URI callbackUri, DataPlaneSelectorService selectorClient,
TypeTransformerRegistry typeTransformerRegistry, ClientFactory clientFactory,
DataAddressStore dataAddressStore) {
DataAddressStore dataAddressStore, AssetIndex assetIndex) {
this.callbackUri = callbackUri;
this.selectorClient = selectorClient;
this.typeTransformerRegistry = typeTransformerRegistry;
this.clientFactory = clientFactory;
this.dataAddressStore = dataAddressStore;
this.assetIndex = assetIndex;
}

@Override
Expand Down Expand Up @@ -264,16 +268,19 @@ public Set<String> transferTypesFor(Asset asset) {
var profiles = Optional.ofNullable(asset.getDataplaneMetadata())
.map(DataplaneMetadata::getProfiles)
.orElse(List.of());
return transferTypes(profiles);
return transferTypes(asset.getParticipantContextId(), profiles);
}

@Override
public Set<String> transferTypesFor(String assetId) {
return transferTypes(List.of());
return Optional.ofNullable(assetIndex.findById(assetId))
.map(this::transferTypesFor)
.orElseGet(Collections::emptySet);
}

private @NotNull Set<String> transferTypes(List<String> profiles) {
return selectorClient.getAll().map(Collection::stream)
private @NotNull Set<String> transferTypes(String participantContextId, List<String> profiles) {
return selectorClient.search(queryByParticipantContextId(participantContextId).build())
.map(Collection::stream)
.map(dataPlane -> dataPlane.map(DataPlaneInstance::getAllowedTransferTypes)
.filter(allowedTransferTypes -> profiles.isEmpty() || !Collections.disjoint(allowedTransferTypes, profiles))
.flatMap(Collection::stream).collect(toSet()))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2026 Think-it GmbH
* Copyright (c) 2026 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand All @@ -8,7 +8,7 @@
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
* Metaform Systems, Inc. - initial API and implementation
*
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2026 Think-it GmbH
* Copyright (c) 2026 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
Expand All @@ -8,7 +8,7 @@
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Think-it GmbH - initial API and implementation
* Metaform Systems, Inc. - initial API and implementation
*
*/

Expand Down Expand Up @@ -41,7 +41,7 @@
@Produces(APPLICATION_JSON)
@Consumes(APPLICATION_JSON)
@Path("/v5beta/participants/{participantContextId}/dataplanes")
public class DataPlaneRegistrationApiV5Controller implements org.eclipse.edc.signaling.port.api.management.v5.DataPlaneRegistrationApiV5 {
public class DataPlaneRegistrationApiV5Controller implements DataPlaneRegistrationApiV5 {

private final DataPlaneSelectorService dataPlaneSelectorService;
private final AuthorizationService authorizationService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
import org.eclipse.edc.connector.controlplane.asset.spi.domain.DataplaneMetadata;
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataAddressStore;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
Expand Down Expand Up @@ -67,10 +68,11 @@ public class DataPlaneSignalingFlowControllerTest {
private final DataPlaneSelectorService selectorService = mock();
private final TypeTransformerRegistry typeTransformerRegistry = mock();
private final DataAddressStore dataAddressStore = mock();
private final AssetIndex assetIndex = mock();

private final DataPlaneSignalingFlowController flowController = new DataPlaneSignalingFlowController(
URI.create("http://localhost"), selectorService,
typeTransformerRegistry, clientFactory, dataAddressStore);
typeTransformerRegistry, clientFactory, dataAddressStore, assetIndex);

@Nested
class Prepare {
Expand Down Expand Up @@ -488,7 +490,7 @@ class TransferTypes {
void transferTypes_shouldReturnTypesForSpecifiedAsset() {

var assetNoResponse = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build();
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(
when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of(
dataPlaneInstanceBuilder().allowedTransferType("Custom-PUSH").build(),
dataPlaneInstanceBuilder().allowedTransferType("Custom-PULL").build()
)));
Expand All @@ -500,7 +502,7 @@ void transferTypes_shouldReturnTypesForSpecifiedAsset() {

@Test
void shouldReturnEmptyList_whenCannotGetDataplaneInstances() {
when(selectorService.getAll()).thenReturn(ServiceResult.unexpected("error"));
when(selectorService.search(any())).thenReturn(ServiceResult.unexpected("error"));
var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build();

var transferTypes = flowController.transferTypesFor(asset);
Expand All @@ -510,7 +512,7 @@ void shouldReturnEmptyList_whenCannotGetDataplaneInstances() {

@Test
void shouldFilterByAssetProfile_whenProfilesAreSet() {
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(
when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of(
dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build(),
dataPlaneInstanceBuilder().allowedTransferType("S3-PULL").build()
)));
Expand All @@ -527,7 +529,7 @@ void shouldFilterByAssetProfile_whenProfilesAreSet() {

@Test
void shouldReturnAllTypes_whenAssetHasNoProfiles() {
when(selectorService.getAll()).thenReturn(ServiceResult.success(List.of(
when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of(
dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build(),
dataPlaneInstanceBuilder().allowedTransferType("S3-PULL").build()
)));
Expand All @@ -539,6 +541,54 @@ void shouldReturnAllTypes_whenAssetHasNoProfiles() {

assertThat(transferTypes).containsExactlyInAnyOrder("Http-PULL", "S3-PULL");
}

@Test
void shouldFilterDataPlanesByParticipantContext() {
when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of(
dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build()
)));
var asset = Asset.Builder.newInstance()
.participantContextId("participant-1")
.dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build())
.build();

flowController.transferTypesFor(asset);

verify(selectorService).search(argThat(querySpec -> querySpec.getFilterExpression().stream().anyMatch(criterion ->
"participantContextId".equals(criterion.getOperandLeft()) &&
"=".equals(criterion.getOperator()) &&
"participant-1".equals(criterion.getOperandRight()))));
}

@Test
void shouldResolveAssetById_whenLookingUpByAssetId() {
var asset = Asset.Builder.newInstance()
.id("assetId")
.participantContextId("participant-1")
.dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build())
.build();
when(assetIndex.findById("assetId")).thenReturn(asset);
when(selectorService.search(any())).thenReturn(ServiceResult.success(List.of(
dataPlaneInstanceBuilder().allowedTransferType("Http-PULL").build()
)));

var transferTypes = flowController.transferTypesFor("assetId");

assertThat(transferTypes).containsExactly("Http-PULL");
verify(selectorService).search(argThat(querySpec -> querySpec.getFilterExpression().stream().anyMatch(criterion ->
"participantContextId".equals(criterion.getOperandLeft()) &&
"participant-1".equals(criterion.getOperandRight()))));
}

@Test
void shouldReturnEmpty_whenAssetIdNotFound() {
when(assetIndex.findById("missing")).thenReturn(null);

var transferTypes = flowController.transferTypesFor("missing");

assertThat(transferTypes).isEmpty();
verifyNoInteractions(selectorService);
}
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
{
"version": "5.0.0-beta",
"urlPath": "/v5beta",
"lastUpdated": "2026-05-27T09:00:00Z",
"lastUpdated": "2026-05-28T09:00:00Z",
"maturity": "beta"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void requestCatalog_shouldReturnCatalog_withQuerySpec(ManagementEndToEndV5TestCo

var asset2 = createAsset("id-2", "test-type")
.property(DCT_CONFORMS_TO_ATTRIBUTE, Map.of(ID, "https://example.org/schema")).build();

assetIndex.create(asset2);
createContractOffer(policyDefinitionStore, contractDefinitionStore, List.of());

Expand Down Expand Up @@ -368,7 +368,9 @@ void getDataset_shouldReturnDataset(ManagementEndToEndV5TestContext context, Ass
ContractDefinitionStore contractDefinitionStore,
DataPlaneInstanceStore dataPlaneInstanceStore) {
var dataPlaneInstance = DataPlaneInstance.Builder.newInstance().url("http://localhost/any")
.allowedSourceType("test-type").allowedTransferType("any-PULL").build();
.allowedSourceType("test-type").allowedTransferType("any-PULL")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
dataPlaneInstanceStore.save(dataPlaneInstance);

createContractOffer(policyDefinitionStore, contractDefinitionStore, List.of());
Expand Down Expand Up @@ -430,7 +432,9 @@ void getDatasetWithResponseChannel_shouldReturnDataset(ManagementEndToEndV5TestC

var dataPlaneInstance = DataPlaneInstance.Builder.newInstance().url("http://localhost/any")
.allowedDestType("any").allowedSourceType("test-type")
.allowedTransferType("any-PULL").allowedTransferType("any-PULL-response").build();
.allowedTransferType("any-PULL").allowedTransferType("any-PULL-response")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
dataPlaneInstanceStore.save(dataPlaneInstance);

var responseChannel = DataAddress.Builder.newInstance()
Expand Down Expand Up @@ -467,13 +471,17 @@ void getDatasetWithResponseChannel_shouldReturnDataset(ManagementEndToEndV5TestC

@Test
void getDataset_shouldFilterDistributionsByAssetProfiles(ManagementEndToEndV5TestContext context, AssetIndex assetIndex,
DataPlaneInstanceStore dataPlaneInstanceStore,
PolicyDefinitionStore policyDefinitionStore,
ContractDefinitionStore contractDefinitionStore) {
DataPlaneInstanceStore dataPlaneInstanceStore,
PolicyDefinitionStore policyDefinitionStore,
ContractDefinitionStore contractDefinitionStore) {
var httpDataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any")
.allowedSourceType("test-type").allowedTransferType("Http-PULL").build();
.allowedSourceType("test-type").allowedTransferType("Http-PULL")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
var s3DataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any-s3")
.allowedSourceType("test-type").allowedTransferType("S3-PULL").build();
.allowedSourceType("test-type").allowedTransferType("S3-PULL")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
dataPlaneInstanceStore.save(httpDataPlane);
dataPlaneInstanceStore.save(s3DataPlane);

Expand Down Expand Up @@ -506,13 +514,17 @@ void getDataset_shouldFilterDistributionsByAssetProfiles(ManagementEndToEndV5Tes

@Test
void getDataset_shouldReturnAllDistributions_whenAssetHasNoProfiles(ManagementEndToEndV5TestContext context, AssetIndex assetIndex,
DataPlaneInstanceStore dataPlaneInstanceStore,
PolicyDefinitionStore policyDefinitionStore,
ContractDefinitionStore contractDefinitionStore) {
DataPlaneInstanceStore dataPlaneInstanceStore,
PolicyDefinitionStore policyDefinitionStore,
ContractDefinitionStore contractDefinitionStore) {
var httpDataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any")
.allowedSourceType("test-type").allowedTransferType("Http-PULL").build();
.allowedSourceType("test-type").allowedTransferType("Http-PULL")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
var s3DataPlane = DataPlaneInstance.Builder.newInstance().url("http://localhost/any-s3")
.allowedSourceType("test-type").allowedTransferType("S3-PULL").build();
.allowedSourceType("test-type").allowedTransferType("S3-PULL")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
dataPlaneInstanceStore.save(httpDataPlane);
dataPlaneInstanceStore.save(s3DataPlane);

Expand Down Expand Up @@ -575,7 +587,9 @@ void getDataset_tokenBearerIsAdmin(ManagementEndToEndV5TestContext context, Data
PolicyDefinitionStore policyDefinitionStore, ContractDefinitionStore contractDefinitionStore,
AssetIndex assetIndex, OauthServer authServer) {
var dataPlaneInstance = DataPlaneInstance.Builder.newInstance().url("http://localhost/any")
.allowedSourceType("test-type").allowedTransferType("any-PULL").build();
.allowedSourceType("test-type").allowedTransferType("any-PULL")
.participantContextId(PARTICIPANT_CONTEXT_ID)
.build();
dataPlaneInstanceStore.save(dataPlaneInstance);

createContractOffer(policyDefinitionStore, contractDefinitionStore, List.of());
Expand Down
Loading
Loading