Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
public ServiceResult<TransferProcess> notifyRequested(ParticipantContext participantContext, TransferRequestMessage message, TokenRepresentation tokenRepresentation) {
return transactionContext.execute(() -> fetchContractAgreement(participantContext, message)
.compose(contractAgreement -> verifyRequest(participantContext, tokenRepresentation, message, contractAgreement))
.compose(context -> validateRequestMessage(message, context))
.compose(context -> requestedAction(participantContext, message, context)));
}

Expand All @@ -111,9 +110,8 @@
@NotNull
public ServiceResult<TransferProcess> notifyStarted(ParticipantContext participantContext, TransferStartMessage message, TokenRepresentation tokenRepresentation) {
return transactionContext.execute(() -> fetchRequestContext(participantContext, message.getProcessId())
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement())
.compose(this::validateCounterParty))
.compose(context -> onMessageDo(participantContext, message, transferProcess -> startedAction(message, transferProcess)))
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement()))
.compose(context -> onMessageDo(message, transferProcess -> startedAction(message, transferProcess)))
);
}

Expand All @@ -122,9 +120,8 @@
@NotNull
public ServiceResult<TransferProcess> notifyCompleted(ParticipantContext participantContext, TransferCompletionMessage message, TokenRepresentation tokenRepresentation) {
return transactionContext.execute(() -> fetchRequestContext(participantContext, message.getProcessId())
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement())
.compose(this::validateCounterParty))
.compose(i -> onMessageDo(participantContext, message, transferProcess -> completedAction(message, transferProcess)))
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement()))
.compose(i -> onMessageDo(message, transferProcess -> completedAction(message, transferProcess)))
);
}

Expand All @@ -133,9 +130,8 @@
@NotNull
public ServiceResult<TransferProcess> notifySuspended(ParticipantContext participantContext, TransferSuspensionMessage message, TokenRepresentation tokenRepresentation) {
return transactionContext.execute(() -> fetchRequestContext(participantContext, message.getProcessId())
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement())
.compose(this::validateCounterParty))
.compose(i -> onMessageDo(participantContext, message, transferProcess -> suspendedAction(message, transferProcess)))
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement()))
.compose(i -> onMessageDo(message, transferProcess -> suspendedAction(message, transferProcess)))
);
}

Expand All @@ -144,9 +140,8 @@
@NotNull
public ServiceResult<TransferProcess> notifyTerminated(ParticipantContext participantContext, TransferTerminationMessage message, TokenRepresentation tokenRepresentation) {
return transactionContext.execute(() -> fetchRequestContext(participantContext, message.getProcessId())
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement())
.compose(this::validateCounterParty))
.compose(i -> onMessageDo(participantContext, message, transferProcess -> terminatedAction(message, transferProcess)))
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement()))
.compose(i -> onMessageDo(message, transferProcess -> terminatedAction(message, transferProcess)))
);
}

Expand All @@ -157,30 +152,49 @@

return transactionContext.execute(() -> fetchRequestContext(participantContext, message.getTransferProcessId())
.compose(context -> verifyRequest(participantContext, tokenRepresentation, message, context.agreement())
.compose(this::validateCounterParty)
.map(it -> context.transferProcess()))
);
}

@NotNull
private ServiceResult<TransferProcess> requestedAction(ParticipantContext participantContext, TransferRequestMessage message, ClaimTokenContext claimTokenContext) {
private ServiceResult<TransferProcess> requestedAction(ParticipantContext participantContext, TransferRequestMessage message, ClaimTokenContext context) {
var destination = message.getDataAddress();
if (destination != null) {
var validDestination = dataAddressValidator.validateDestination(destination);
if (validDestination.failed()) {
return ServiceResult.badRequest(validDestination.getFailureMessages());
}
}

var transferType = message.getTransferType();
var supportedTransferTypes = dataFlowController.transferTypesFor(context.agreement().getAssetId());
if (!supportedTransferTypes.contains(transferType)) {
return ServiceResult.badRequest("TransferType %s is not supported".formatted(transferType));
}

var validationResult = contractValidationService.validateAgreement(context.participantAgent(), context.agreement());
if (validationResult.failed()) {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "agreement not found or not valid"));
}

var existingTransferProcess = transferProcessStore.findForCorrelationId(message.getConsumerPid());
if (existingTransferProcess != null) {
return ServiceResult.success(existingTransferProcess);
}

return transferProcessProviderFactory.create(participantContext, message, claimTokenContext.agreement(), claimTokenContext.participantAgent())
return transferProcessProviderFactory.create(participantContext, message, context.agreement(), context.participantAgent())
.compose(process -> {
var dataAddressStorage = message.getDataAddress() == null
? StoreResult.success()
: dataAddressStore.store(message.getDataAddress(), process);

return dataAddressStorage.flatMap(ServiceResult::from)
.onSuccess(ignored -> {
return dataAddressStorage
.compose(i -> {

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'i' is never used.
Comment thread
ndr-brt marked this conversation as resolved.
Dismissed
process.protocolMessageReceived(message.getId());
update(process);
observable.invokeForEach(l -> l.initiated(process));
return update(process);
})
.onSuccess(i -> observable.invokeForEach(l -> l.initiated(process)))
.flatMap(ServiceResult::from)
.map(ignored -> process);
});
}
Expand Down Expand Up @@ -276,29 +290,6 @@
}
}

private ServiceResult<ClaimTokenContext> validateRequestMessage(TransferRequestMessage message, ClaimTokenContext context) {
var destination = message.getDataAddress();
if (destination != null) {
var validDestination = dataAddressValidator.validateDestination(destination);
if (validDestination.failed()) {
return ServiceResult.badRequest(validDestination.getFailureMessages());
}
}

var transferType = message.getTransferType();
var supportedTransferTypes = dataFlowController.transferTypesFor(context.agreement().getAssetId());
if (!supportedTransferTypes.contains(transferType)) {
return ServiceResult.badRequest("TransferType %s is not supported".formatted(transferType));
}

var validationResult = contractValidationService.validateAgreement(context.participantAgent(), context.agreement());
if (validationResult.failed()) {
return ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "agreement not found or not valid"));
}

return ServiceResult.success(context);
}

private ServiceResult<ContractAgreement> fetchContractAgreement(ParticipantContext participantContext, TransferRequestMessage message) {
return Optional.ofNullable(findAgreement(participantContext, message.getContractId()))
.filter(agreement -> participantContext.getParticipantContextId().equals(agreement.getParticipantContextId()))
Expand All @@ -307,61 +298,53 @@
}

private ServiceResult<TransferMessageContext> fetchRequestContext(ParticipantContext participantContext, String transferProcessId) {
return findTransferProcessByIdReadOnly(participantContext, transferProcessId)
.compose(transferProcess -> {
var agreement = negotiationStore.findContractAgreement(transferProcess.getContractId());
if (agreement == null) {
return ServiceResult.notFound(format("No transfer process with id %s found", transferProcess.getId()));
}
return ServiceResult.success(new TransferMessageContext(agreement, transferProcess));
});
var transferProcess = transferProcessStore.findById(transferProcessId);
if (transferProcess == null) {
return notFound(transferProcessId);
}

if (!participantContext.getParticipantContextId().equals(transferProcess.getParticipantContextId())) {
return notFound(transferProcess.getId());
}

var agreement = negotiationStore.findContractAgreement(transferProcess.getContractId());
if (agreement == null) {
return ServiceResult.notFound(format("No transfer process with id %s found", transferProcess.getId()));
}
return ServiceResult.success(new TransferMessageContext(agreement, transferProcess));
}

private ServiceResult<ClaimTokenContext> verifyRequest(ParticipantContext participantContext, TokenRepresentation tokenRepresentation, RemoteMessage message, ContractAgreement contractAgreement) {
var result = protocolTokenValidator.verify(participantContext, tokenRepresentation, RequestTransferProcessPolicyContext::new, contractAgreement.getPolicy(), message);
if (result.failed()) {
monitor.debug(() -> "Verification Failed: %s".formatted(result.getFailureDetail()));
return ServiceResult.notFound("Not found");
} else {
return ServiceResult.success(new ClaimTokenContext(result.getContent(), contractAgreement));
}
}

private ServiceResult<TransferProcess> onMessageDo(ParticipantContext participantContext, TransferRemoteMessage message,
Function<TransferProcess, ServiceResult<TransferProcess>> action) {
return findAndLease(participantContext, message)
.compose(transferProcess -> {
if (transferProcess.shouldIgnoreIncomingMessage(message.getId())) {
return transferProcessStore.breakLease(transferProcess).flatMap(ServiceResult::from).map(it -> transferProcess);
} else {
return action.apply(transferProcess).onFailure(f -> transferProcessStore.breakLease(transferProcess));
}
});
}

private ServiceResult<Void> validateCounterParty(ClaimTokenContext claimTokenContext) {
var validation = contractValidationService.validateRequest(claimTokenContext.participantAgent(), claimTokenContext.agreement());
var participantAgent = result.getContent();
var validation = contractValidationService.validateRequest(participantAgent, contractAgreement);
if (validation.failed()) {
return ServiceResult.badRequest(validation.getFailureMessages());
}

return ServiceResult.success();
return ServiceResult.success(new ClaimTokenContext(participantAgent, contractAgreement));
}

// find and lease - write access
private ServiceResult<TransferProcess> findAndLease(ParticipantContext participantContext, TransferRemoteMessage remoteMessage) {
private ServiceResult<TransferProcess> onMessageDo(TransferRemoteMessage message,
Function<TransferProcess, ServiceResult<TransferProcess>> action) {

return transferProcessStore
.findByIdAndLease(remoteMessage.getProcessId())
.findByIdAndLease(message.getProcessId())
.flatMap(ServiceResult::from)
.compose(tp -> filterByParticipantContext(participantContext, tp));
}
.compose(transferProcess -> {
if (transferProcess.shouldIgnoreIncomingMessage(message.getId())) {
transferProcessStore.breakLease(transferProcess);
return ServiceResult.success(transferProcess);
}

private ServiceResult<TransferProcess> filterByParticipantContext(ParticipantContext participantContext, TransferProcess transferProcess) {
if (participantContext.getParticipantContextId().equals(transferProcess.getParticipantContextId())) {
return ServiceResult.success(transferProcess);
} else {
return notFound(transferProcess.getId());
}
return action.apply(transferProcess)
.onFailure(f -> transferProcessStore.breakLease(transferProcess));
});
}

private ContractAgreement findAgreement(ParticipantContext participantContext, String contractId) {
Expand All @@ -373,13 +356,7 @@
}
}

private ServiceResult<TransferProcess> findTransferProcessByIdReadOnly(ParticipantContext participantContext, String id) {
return Optional.ofNullable(transferProcessStore.findById(id))
.map(tp -> filterByParticipantContext(participantContext, tp))
.orElseGet(() -> notFound(id));
}

private ServiceResult<TransferProcess> notFound(String transferProcessId) {
private <T> ServiceResult<T> notFound(String transferProcessId) {
return ServiceResult.notFound(format("No transfer process with id %s found", transferProcessId));
}

Expand Down
Loading
Loading