controllerServices =
+ serviceFacade.getConnectorControllerServices(connectorId, processGroupId, includeAncestorGroups, includeDescendantGroups, includeReferences);
controllerServiceResource.populateRemainingControllerServiceEntitiesContent(controllerServices);
// create the response entity
@@ -2005,7 +2157,9 @@ public Response getConnectorStatus(
connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
- // get the status for the connector's managed process group
+ // Get the status for the Connector's managed Process Group. The facade method builds the status DTO from the
+ // resolved ProcessGroup directly (no DAO locate calls for Connector-managed components), so the normal
+ // Troubleshooting access gate does not apply.
final ProcessGroupStatusEntity entity = serviceFacade.getConnectorProcessGroupStatus(id, recursive);
return generateOkResponse(entity).build();
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
index 3175a306a2fe..d3d108e1d982 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowFileQueueResource.java
@@ -307,7 +307,7 @@ public Response createFlowFileListing(
serviceFacade,
requestConnectionEntity,
lookup -> {
- final ConnectionAuthorizable connAuth = lookup.getConnection(id, true);
+ final ConnectionAuthorizable connAuth = lookup.forConnectorManagedFlow().getConnection(id);
final Authorizable dataAuthorizable = connAuth.getSourceData();
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
},
@@ -374,7 +374,7 @@ public Response getListingRequest(
// authorize access
serviceFacade.authorizeAccess(lookup -> {
- final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId, true);
+ final ConnectionAuthorizable connAuth = lookup.forConnectorManagedFlow().getConnection(connectionId);
final Authorizable dataAuthorizable = connAuth.getSourceData();
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
@@ -435,7 +435,7 @@ public Response deleteListingRequest(
serviceFacade,
new ListingEntity(connectionId, listingRequestId),
lookup -> {
- final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId, true);
+ final ConnectionAuthorizable connAuth = lookup.forConnectorManagedFlow().getConnection(connectionId);
final Authorizable dataAuthorizable = connAuth.getSourceData();
dataAuthorizable.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
},
@@ -522,7 +522,7 @@ public Response createDropRequest(
serviceFacade,
requestConnectionEntity,
lookup -> {
- final ConnectionAuthorizable connAuth = lookup.getConnection(id, true);
+ final ConnectionAuthorizable connAuth = lookup.forConnectorManagedFlow().getConnection(id);
final Authorizable dataAuthorizable = connAuth.getSourceData();
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
@@ -589,7 +589,7 @@ public Response getDropRequest(
// authorize access
serviceFacade.authorizeAccess(lookup -> {
- final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId, true);
+ final ConnectionAuthorizable connAuth = lookup.forConnectorManagedFlow().getConnection(connectionId);
final Authorizable dataAuthorizable = connAuth.getSourceData();
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
});
@@ -650,7 +650,7 @@ public Response removeDropRequest(
serviceFacade,
new DropEntity(connectionId, dropRequestId),
lookup -> {
- final ConnectionAuthorizable connAuth = lookup.getConnection(connectionId, true);
+ final ConnectionAuthorizable connAuth = lookup.forConnectorManagedFlow().getConnection(connectionId);
final Authorizable dataAuthorizable = connAuth.getSourceData();
dataAuthorizable.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
index c42ff3273a01..60033bc8116e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java
@@ -885,7 +885,7 @@ public Response removeDropRequest(
}
private void authorizeHandleDropAllFlowFilesRequest(String processGroupId, AuthorizableLookup lookup) {
- final ProcessGroupAuthorizable processGroup = lookup.getProcessGroup(processGroupId, true);
+ final ProcessGroupAuthorizable processGroup = lookup.forConnectorManagedFlow().getProcessGroup(processGroupId);
authorizeProcessGroup(processGroup, authorizer, lookup, RequestAction.READ, false, false, false, false, false);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 4dfbbe89e062..ecbc773616fd 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -860,7 +860,14 @@ public ProcessorStatus getProcessorStatus(final String processorId) {
*/
public ConnectionStatus getConnectionStatus(final String connectionId) {
final ProcessGroup root = getRootGroup();
- final Connection connection = root.findConnection(connectionId);
+ Connection connection = root.findConnection(connectionId);
+
+ // If the Connection was not found by traversing the root hierarchy, fall back to a direct FlowManager lookup. This
+ // is necessary because Connections that live inside a Connector's Managed Process Group are not part of the main
+ // root Process Group's parent hierarchy, but they are still registered with the FlowManager.
+ if (connection == null) {
+ connection = flowController.getFlowManager().getConnection(connectionId);
+ }
// ensure the connection was found
if (connection == null) {
@@ -920,10 +927,8 @@ public StatusAnalytics getConnectionStatusAnalytics(final String connectionId) {
* @return the status for the specified input port
*/
public PortStatus getInputPortStatus(final String portId) {
- final ProcessGroup root = getRootGroup();
- final Port port = root.findInputPort(portId);
+ final Port port = flowController.findInputPortIncludingConnectorManaged(portId);
- // ensure the input port was found
if (port == null) {
throw new ResourceNotFoundException(String.format("Unable to locate input port with id '%s'.", portId));
}
@@ -949,10 +954,8 @@ public PortStatus getInputPortStatus(final String portId) {
* @return the status for the specified output port
*/
public PortStatus getOutputPortStatus(final String portId) {
- final ProcessGroup root = getRootGroup();
- final Port port = root.findOutputPort(portId);
+ final Port port = flowController.findOutputPortIncludingConnectorManaged(portId);
- // ensure the output port was found
if (port == null) {
throw new ResourceNotFoundException(String.format("Unable to locate output port with id '%s'.", portId));
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
index 8785d670a6c8..98caa34453ff 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java
@@ -43,15 +43,6 @@ public interface ConnectionDAO {
*/
Connection getConnection(String id);
- /**
- * Gets the specified Connection, optionally including Connector-managed ProcessGroups in the search.
- *
- * @param id The connection id
- * @param includeConnectorManaged Whether to search Connector-managed ProcessGroups
- * @return The connection
- */
- Connection getConnection(String id, boolean includeConnectorManaged);
-
/**
* Gets the specified flow file drop request.
*
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
index acf2e7f53900..e63218ae1411 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java
@@ -59,6 +59,14 @@ public interface ConnectorDAO {
void verifyCancelDrainFlowFile(String id);
+ void verifyEnterTroubleshooting(String id);
+
+ void enterTroubleshooting(String id);
+
+ void verifyEndTroubleshooting(String id);
+
+ void endTroubleshooting(String id);
+
void verifyPurgeFlowFiles(String id);
void purgeFlowFiles(String id, String requestor);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorManagedComponentLookup.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorManagedComponentLookup.java
new file mode 100644
index 000000000000..a016529eed33
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorManagedComponentLookup.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.dao;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+
+/**
+ * Facade exposing DAO-level component lookups that include Process Groups managed by a Connector.
+ *
+ * The standard {@link ProcessorDAO}, {@link ConnectionDAO}, {@link ProcessGroupDAO}, etc. intentionally hide
+ * components that live inside a Connector's managed Process Group hierarchy unless the owning Connector is in
+ * Troubleshooting mode. The handful of REST endpoints that legitimately need to inspect or operate on components
+ * within a Connector-managed flow (the FlowFile queue endpoints under {@code /connectors/{id}/...} and the
+ * Connector-aware Process Group flow endpoint, for example) obtain those components through this facade so the
+ * standard DAO surface remains free of the "include connector managed" flag.
+ */
+public interface ConnectorManagedComponentLookup {
+
+ /**
+ * Locate a Processor by identifier, including processors within Connector-managed Process Groups regardless of the
+ * owning Connector's state.
+ */
+ ProcessorNode getProcessor(String id);
+
+ /**
+ * Locate an input Port by identifier, including ports within Connector-managed Process Groups regardless of the
+ * owning Connector's state.
+ */
+ Port getInputPort(String id);
+
+ /**
+ * Locate an output Port by identifier, including ports within Connector-managed Process Groups regardless of the
+ * owning Connector's state.
+ */
+ Port getOutputPort(String id);
+
+ /**
+ * Locate a Connection by identifier, including connections within Connector-managed Process Groups regardless of
+ * the owning Connector's state.
+ */
+ Connection getConnection(String id);
+
+ /**
+ * Locate a Process Group by identifier, including Connector-managed Process Groups regardless of the owning
+ * Connector's state.
+ */
+ ProcessGroup getProcessGroup(String id);
+
+ /**
+ * Locate a Remote Process Group by identifier, including remote process groups within Connector-managed Process
+ * Groups regardless of the owning Connector's state.
+ */
+ RemoteProcessGroup getRemoteProcessGroup(String id);
+
+ /**
+ * Locate a Controller Service by identifier, including services within Connector-managed Process Groups regardless
+ * of the owning Connector's state.
+ */
+ ControllerServiceNode getControllerService(String id);
+
+ /**
+ * Locate a Label by identifier, including labels within Connector-managed Process Groups regardless of the owning
+ * Connector's state.
+ */
+ Label getLabel(String id);
+
+ /**
+ * Locate a Funnel by identifier, including funnels within Connector-managed Process Groups regardless of the
+ * owning Connector's state.
+ */
+ Funnel getFunnel(String id);
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
index 1dd36a46b88b..df546810a6c9 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ProcessGroupDAO.java
@@ -58,15 +58,6 @@ public interface ProcessGroupDAO {
*/
ProcessGroup getProcessGroup(String groupId);
- /**
- * Gets the specified process group, optionally including Connector-managed ProcessGroups in the search.
- *
- * @param groupId The process group id
- * @param includeConnectorManaged Whether to search Connector-managed ProcessGroups
- * @return The process group
- */
- ProcessGroup getProcessGroup(String groupId, boolean includeConnectorManaged);
-
/**
* Gets all of the process groups.
*
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/AbstractPortDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/AbstractPortDAO.java
index a41888bd3b98..2be03fe0f9d6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/AbstractPortDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/AbstractPortDAO.java
@@ -40,7 +40,16 @@ public abstract class AbstractPortDAO extends ComponentDAO implements PortDAO {
protected FlowController flowController;
- protected abstract Port locatePort(final String portId);
+ protected Port locatePort(final String portId) {
+ return locatePort(portId, false);
+ }
+
+ protected abstract Port locatePort(final String portId, final boolean includeConnectorManaged);
+
+ @Override
+ public Port getPort(final String portId) {
+ return locatePort(portId);
+ }
@Override
public void verifyUpdate(PortDTO portDTO) {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
index 26f0de302520..5d1e66dd7984 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/ComponentDAO.java
@@ -18,6 +18,8 @@
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
@@ -25,6 +27,7 @@
import org.apache.nifi.web.api.dto.BundleDTO;
import java.util.List;
+import java.util.Optional;
public abstract class ComponentDAO {
@@ -83,17 +86,55 @@ protected ProcessGroup locateProcessGroup(final FlowController flowController, f
return group;
}
- // Optionally search Connector-managed ProcessGroups
- if (includeConnectorManaged) {
- group = flowController.getFlowManager().getGroup(groupId);
- if (group != null) {
+ // Search Connector-managed ProcessGroups. The unconditional search is important so that if a component exists
+ // in a Connector-managed flow but the Connector is not in Troubleshooting mode, we can produce a clear 409
+ // Conflict response rather than a 404 Not Found.
+ group = flowController.getFlowManager().getGroup(groupId);
+ if (group != null) {
+ if (includeConnectorManaged) {
return group;
}
+
+ verifyAccessibleForComponentOperation(group, groupId);
+ return group;
}
throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
}
+ /**
+ * Verifies that the component represented by the given {@link ProcessGroup} (or a component contained within it) is
+ * accessible for a direct user-facing operation such as GET/PUT/POST/DELETE of the component itself. Components that
+ * live within a Connector's managed Process Group hierarchy are only accessible when the owning Connector is in
+ * {@link ConnectorState#TROUBLESHOOTING} mode. If the owning Connector is not in Troubleshooting mode, an
+ * {@link IllegalStateException} is thrown which is translated by the REST layer into a 409 Conflict response.
+ *
+ * Connector-aware REST endpoints that need to read components within a managed flow regardless of the
+ * Connector's state must obtain those components through the
+ * {@link org.apache.nifi.web.dao.ConnectorManagedComponentLookup} facade (or, for authorization, through
+ * {@link org.apache.nifi.authorization.AuthorizableLookup#forConnectorManagedFlow()}) so that this verification is
+ * skipped at the locate call site rather than being bypassed globally for the current thread.
+ *
+ * @param group the ProcessGroup that owns (or is) the component being accessed
+ * @param componentId the identifier of the component being accessed (used in the error message)
+ */
+ protected void verifyAccessibleForComponentOperation(final ProcessGroup group, final String componentId) {
+ if (group == null) {
+ return;
+ }
+
+ final Optional owningConnector = group.findOwningConnector();
+ if (owningConnector.isEmpty()) {
+ return;
+ }
+
+ final ConnectorNode connector = owningConnector.get();
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ throw new IllegalStateException("Component [" + componentId + "] is managed by Connector " + connector.getName() + " ("
+ + connector.getIdentifier() + "); the Connector must be in Troubleshooting mode for this component to be accessible.");
+ }
+ }
+
protected void verifyCreate(final ExtensionManager extensionManager, final String type, final BundleDTO bundle) {
final List bundles = extensionManager.getBundles(type);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
index 5b4ab5c14205..61681598e7bb 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java
@@ -24,6 +24,7 @@
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.connectable.Connectable;
@@ -82,8 +83,7 @@ private Connection locateConnection(final String connectionId) {
return locateConnection(connectionId, false);
}
- private Connection locateConnection(final String connectionId, final boolean includeConnectorManaged) {
- // First, search the main flow hierarchy
+ Connection locateConnection(final String connectionId, final boolean includeConnectorManaged) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
Connection connection = rootGroup.findConnection(connectionId);
@@ -91,17 +91,20 @@ private Connection locateConnection(final String connectionId, final boolean inc
return connection;
}
- // Optionally search Connector-managed ProcessGroups
- if (includeConnectorManaged) {
- for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
- final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
- if (flowContext != null) {
- final ProcessGroup managedGroup = flowContext.getManagedProcessGroup();
- connection = managedGroup.findConnection(connectionId);
- if (connection != null) {
- return connection;
- }
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext == null) {
+ continue;
+ }
+
+ final ProcessGroup managedGroup = flowContext.getManagedProcessGroup();
+ connection = managedGroup.findConnection(connectionId);
+ if (connection != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(connection.getProcessGroup(), connectionId);
}
+
+ return connection;
}
}
@@ -111,7 +114,22 @@ private Connection locateConnection(final String connectionId, final boolean inc
@Override
public boolean hasConnection(String id) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findConnection(id) != null;
+ if (rootGroup.findConnection(id) != null) {
+ return true;
+ }
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ continue;
+ }
+
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext != null && flowContext.getManagedProcessGroup().findConnection(id) != null) {
+ return true;
+ }
+ }
+
+ return false;
}
@Override
@@ -119,11 +137,6 @@ public Connection getConnection(final String id) {
return locateConnection(id);
}
- @Override
- public Connection getConnection(final String id, final boolean includeConnectorManaged) {
- return locateConnection(id, includeConnectorManaged);
- }
-
@Override
public Set getConnections(final String groupId) {
final ProcessGroup group = locateProcessGroup(flowController, groupId);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
index 9a507e16e5ce..176cee91441f 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java
@@ -27,6 +27,7 @@
import org.apache.nifi.components.connector.ConnectorUpdateContext;
import org.apache.nifi.components.connector.ConnectorValueReference;
import org.apache.nifi.components.connector.ConnectorValueType;
+import org.apache.nifi.components.connector.FlowUpdateException;
import org.apache.nifi.components.connector.SecretReference;
import org.apache.nifi.components.connector.StepConfiguration;
import org.apache.nifi.components.connector.StringLiteralValue;
@@ -157,6 +158,34 @@ public void verifyCancelDrainFlowFile(final String id) {
connector.verifyCancelDrainFlowFiles();
}
+ @Override
+ public void verifyEnterTroubleshooting(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ getConnectorRepository().verifyEnterTroubleshooting(connector);
+ }
+
+ @Override
+ public void enterTroubleshooting(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ getConnectorRepository().enterTroubleshooting(connector);
+ }
+
+ @Override
+ public void verifyEndTroubleshooting(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ getConnectorRepository().verifyEndTroubleshooting(connector);
+ }
+
+ @Override
+ public void endTroubleshooting(final String id) {
+ final ConnectorNode connector = getConnector(id);
+ try {
+ getConnectorRepository().endTroubleshooting(connector);
+ } catch (final FlowUpdateException e) {
+ throw new IllegalStateException("Failed to exit troubleshooting mode for Connector " + id + ": " + e, e);
+ }
+ }
+
@Override
public void verifyPurgeFlowFiles(final String id) {
final ConnectorNode connector = requireConnector(id, ConnectorSyncMode.LOCAL_ONLY);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorManagedComponentLookup.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorManagedComponentLookup.java
new file mode 100644
index 000000000000..ef271a8f4b3a
--- /dev/null
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorManagedComponentLookup.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.dao.impl;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.web.dao.ConnectorManagedComponentLookup;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Default {@link ConnectorManagedComponentLookup} implementation. Delegates each lookup to the package-private
+ * {@code locate*(id, true)} helper on the corresponding {@code Standard*DAO}, which performs the same search the
+ * standard DAO surface performs but skips the verification that would otherwise reject access to a component inside a
+ * Connector-managed Process Group whose owning Connector is not in Troubleshooting mode.
+ */
+@Component
+public class StandardConnectorManagedComponentLookup extends ComponentDAO implements ConnectorManagedComponentLookup {
+
+ private FlowController flowController;
+ private StandardProcessorDAO processorDAO;
+ private StandardConnectionDAO connectionDAO;
+ private StandardInputPortDAO inputPortDAO;
+ private StandardOutputPortDAO outputPortDAO;
+ private StandardLabelDAO labelDAO;
+ private StandardFunnelDAO funnelDAO;
+ private StandardRemoteProcessGroupDAO remoteProcessGroupDAO;
+ private StandardControllerServiceDAO controllerServiceDAO;
+
+ @Override
+ public ProcessorNode getProcessor(final String id) {
+ return processorDAO.locateProcessor(id, true);
+ }
+
+ @Override
+ public Port getInputPort(final String id) {
+ return inputPortDAO.locatePort(id, true);
+ }
+
+ @Override
+ public Port getOutputPort(final String id) {
+ return outputPortDAO.locatePort(id, true);
+ }
+
+ @Override
+ public Connection getConnection(final String id) {
+ return connectionDAO.locateConnection(id, true);
+ }
+
+ @Override
+ public ProcessGroup getProcessGroup(final String id) {
+ return locateProcessGroup(flowController, id, true);
+ }
+
+ @Override
+ public RemoteProcessGroup getRemoteProcessGroup(final String id) {
+ return remoteProcessGroupDAO.locateRemoteProcessGroup(id, true);
+ }
+
+ @Override
+ public ControllerServiceNode getControllerService(final String id) {
+ return controllerServiceDAO.locateControllerService(id, true);
+ }
+
+ @Override
+ public Label getLabel(final String id) {
+ return labelDAO.locateLabel(id, true);
+ }
+
+ @Override
+ public Funnel getFunnel(final String id) {
+ return funnelDAO.locateFunnel(id, true);
+ }
+
+ @Autowired
+ public void setFlowController(final FlowController flowController) {
+ this.flowController = flowController;
+ }
+
+ @Autowired
+ public void setProcessorDAO(final StandardProcessorDAO processorDAO) {
+ this.processorDAO = processorDAO;
+ }
+
+ @Autowired
+ public void setConnectionDAO(final StandardConnectionDAO connectionDAO) {
+ this.connectionDAO = connectionDAO;
+ }
+
+ @Autowired
+ public void setInputPortDAO(final StandardInputPortDAO inputPortDAO) {
+ this.inputPortDAO = inputPortDAO;
+ }
+
+ @Autowired
+ public void setOutputPortDAO(final StandardOutputPortDAO outputPortDAO) {
+ this.outputPortDAO = outputPortDAO;
+ }
+
+ @Autowired
+ public void setLabelDAO(final StandardLabelDAO labelDAO) {
+ this.labelDAO = labelDAO;
+ }
+
+ @Autowired
+ public void setFunnelDAO(final StandardFunnelDAO funnelDAO) {
+ this.funnelDAO = funnelDAO;
+ }
+
+ @Autowired
+ public void setRemoteProcessGroupDAO(final StandardRemoteProcessGroupDAO remoteProcessGroupDAO) {
+ this.remoteProcessGroupDAO = remoteProcessGroupDAO;
+ }
+
+ @Autowired
+ public void setControllerServiceDAO(final StandardControllerServiceDAO controllerServiceDAO) {
+ this.controllerServiceDAO = controllerServiceDAO;
+ }
+}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
index 6c13a29d0b3e..7875221cec87 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardControllerServiceDAO.java
@@ -71,14 +71,19 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
private FlowController flowController;
private ControllerServiceNode locateControllerService(final String controllerServiceId) {
- // get the controller service
+ return locateControllerService(controllerServiceId, false);
+ }
+
+ ControllerServiceNode locateControllerService(final String controllerServiceId, final boolean includeConnectorManaged) {
final ControllerServiceNode controllerService = serviceProvider.getControllerServiceNode(controllerServiceId);
- // ensure the controller service exists
if (controllerService == null) {
throw new ResourceNotFoundException(String.format("Unable to locate controller service with id '%s'.", controllerServiceId));
}
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(controllerService.getProcessGroup(), controllerServiceId);
+ }
return controllerService;
}
@@ -116,11 +121,7 @@ public ControllerServiceNode createControllerService(final ControllerServiceDTO
if (groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS)) {
group = flowManager.getRootGroup();
} else {
- group = flowManager.getRootGroup().findProcessGroup(groupId);
- }
-
- if (group == null) {
- throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+ group = locateProcessGroup(flowController, groupId);
}
group.addControllerService(controllerService);
@@ -148,20 +149,17 @@ public Set getControllerServices(final String groupId, fi
if (groupId == null) {
return flowManager.getRootControllerServices();
- } else {
- final String searchId = groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS) ? flowManager.getRootGroupId() : groupId;
- final ProcessGroup procGroup = flowManager.getRootGroup().findProcessGroup(searchId);
- if (procGroup == null) {
- throw new ResourceNotFoundException("Could not find Process Group with ID " + groupId);
- }
+ }
- final Set serviceNodes = procGroup.getControllerServices(includeAncestorGroups);
- if (includeDescendantGroups) {
- serviceNodes.addAll(procGroup.findAllControllerServices());
- }
+ final String searchId = groupId.equals(FlowManager.ROOT_GROUP_ID_ALIAS) ? flowManager.getRootGroupId() : groupId;
+ final ProcessGroup procGroup = locateProcessGroup(flowController, searchId);
- return serviceNodes;
+ final Set serviceNodes = procGroup.getControllerServices(includeAncestorGroups);
+ if (includeDescendantGroups) {
+ serviceNodes.addAll(procGroup.findAllControllerServices());
}
+
+ return serviceNodes;
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
index 186aa09a97d6..99ea168bf6a6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFunnelDAO.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
+import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
@@ -34,20 +38,53 @@ public class StandardFunnelDAO extends ComponentDAO implements FunnelDAO {
private FlowController flowController;
private Funnel locateFunnel(final String funnelId) {
- final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- final Funnel funnel = rootGroup.findFunnel(funnelId);
+ return locateFunnel(funnelId, false);
+ }
- if (funnel == null) {
- throw new ResourceNotFoundException(String.format("Unable to find funnel with id '%s'.", funnelId));
- } else {
+ Funnel locateFunnel(final String funnelId, final boolean includeConnectorManaged) {
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ Funnel funnel = rootGroup.findFunnel(funnelId);
+ if (funnel != null) {
return funnel;
}
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext == null) {
+ continue;
+ }
+
+ funnel = flowContext.getManagedProcessGroup().findFunnel(funnelId);
+ if (funnel != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(funnel.getProcessGroup(), funnelId);
+ }
+ return funnel;
+ }
+ }
+
+ throw new ResourceNotFoundException(String.format("Unable to find funnel with id '%s'.", funnelId));
}
@Override
public boolean hasFunnel(String funnelId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findFunnel(funnelId) != null;
+ if (rootGroup.findFunnel(funnelId) != null) {
+ return true;
+ }
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ continue;
+ }
+
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext != null && flowContext.getManagedProcessGroup().findFunnel(funnelId) != null) {
+ return true;
+ }
+ }
+
+ return false;
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
index cc84aae59a78..3929807b87ba 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardInputPortDAO.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
+import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ScheduledState;
@@ -32,25 +36,60 @@
public class StandardInputPortDAO extends AbstractPortDAO implements PortDAO {
@Override
- protected Port locatePort(final String portId) {
+ protected Port locatePort(final String portId, final boolean includeConnectorManaged) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
Port port = rootGroup.findInputPort(portId);
-
if (port == null) {
port = rootGroup.findOutputPort(portId);
}
-
- if (port == null) {
- throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId));
- } else {
+ if (port != null) {
return port;
}
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext == null) {
+ continue;
+ }
+
+ final ProcessGroup managed = flowContext.getManagedProcessGroup();
+ port = managed.findInputPort(portId);
+ if (port == null) {
+ port = managed.findOutputPort(portId);
+ }
+ if (port != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(port.getProcessGroup(), portId);
+ }
+ return port;
+ }
+ }
+
+ throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId));
}
@Override
public boolean hasPort(String portId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findInputPort(portId) != null || rootGroup.findOutputPort(portId) != null;
+ if (rootGroup.findInputPort(portId) != null || rootGroup.findOutputPort(portId) != null) {
+ return true;
+ }
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ continue;
+ }
+
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext != null) {
+ final ProcessGroup managed = flowContext.getManagedProcessGroup();
+ if (managed.findInputPort(portId) != null || managed.findOutputPort(portId) != null) {
+ return true;
+ }
+ }
+ }
+
+ return false;
}
@Override
@@ -94,11 +133,6 @@ public Port createPort(String groupId, PortDTO portDTO) {
return port;
}
- @Override
- public Port getPort(String portId) {
- return locatePort(portId);
- }
-
@Override
public Set getPorts(String groupId) {
ProcessGroup group = locateProcessGroup(flowController, groupId);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
index f1d0f80bf018..2aa7822033bb 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardLabelDAO.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
+import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.FlowController;
@@ -37,20 +41,53 @@ public class StandardLabelDAO extends ComponentDAO implements LabelDAO {
private FlowController flowController;
private Label locateLabel(final String labelId) {
- final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- final Label label = rootGroup.findLabel(labelId);
+ return locateLabel(labelId, false);
+ }
- if (label == null) {
- throw new ResourceNotFoundException(String.format("Unable to find label with id '%s'.", labelId));
- } else {
+ Label locateLabel(final String labelId, final boolean includeConnectorManaged) {
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ Label label = rootGroup.findLabel(labelId);
+ if (label != null) {
return label;
}
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext == null) {
+ continue;
+ }
+
+ label = flowContext.getManagedProcessGroup().findLabel(labelId);
+ if (label != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(label.getProcessGroup(), labelId);
+ }
+ return label;
+ }
+ }
+
+ throw new ResourceNotFoundException(String.format("Unable to find label with id '%s'.", labelId));
}
@Override
public boolean hasLabel(String labelId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findLabel(labelId) != null;
+ if (rootGroup.findLabel(labelId) != null) {
+ return true;
+ }
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ continue;
+ }
+
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext != null && flowContext.getManagedProcessGroup().findLabel(labelId) != null) {
+ return true;
+ }
+ }
+
+ return false;
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
index 67f50489a54c..dff622bd76cd 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardOutputPortDAO.java
@@ -16,6 +16,10 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
+import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ScheduledState;
@@ -32,21 +36,50 @@
public class StandardOutputPortDAO extends AbstractPortDAO implements PortDAO {
@Override
- protected Port locatePort(final String portId) {
+ protected Port locatePort(final String portId, final boolean includeConnectorManaged) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- final Port port = rootGroup.findOutputPort(portId);
-
- if (port == null) {
- throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId));
- } else {
+ Port port = rootGroup.findOutputPort(portId);
+ if (port != null) {
return port;
}
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext == null) {
+ continue;
+ }
+
+ port = flowContext.getManagedProcessGroup().findOutputPort(portId);
+ if (port != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(port.getProcessGroup(), portId);
+ }
+ return port;
+ }
+ }
+
+ throw new ResourceNotFoundException(String.format("Unable to find port with id '%s'.", portId));
}
@Override
public boolean hasPort(String portId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findOutputPort(portId) != null;
+ if (rootGroup.findOutputPort(portId) != null) {
+ return true;
+ }
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ continue;
+ }
+
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext != null && flowContext.getManagedProcessGroup().findOutputPort(portId) != null) {
+ return true;
+ }
+ }
+
+ return false;
}
@Override
@@ -90,11 +123,6 @@ public Port createPort(String groupId, PortDTO portDTO) {
return port;
}
- @Override
- public Port getPort(String portId) {
- return locatePort(portId);
- }
-
@Override
public Set getPorts(String groupId) {
ProcessGroup group = locateProcessGroup(flowController, groupId);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
index b6612c3126d4..4bc2eb28e855 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java
@@ -187,11 +187,6 @@ public ProcessGroup getProcessGroup(String groupId) {
return locateProcessGroup(flowController, groupId);
}
- @Override
- public ProcessGroup getProcessGroup(String groupId, boolean includeConnectorManaged) {
- return locateProcessGroup(flowController, groupId, includeConnectorManaged);
- }
-
@Override
public Set getProcessGroups(final String parentGroupId, final ProcessGroupRecursivity processGroupRecursivity) {
ProcessGroup group = locateProcessGroup(flowController, parentGroupId);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
index ebdccd1c15c6..3b386f244630 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessorDAO.java
@@ -20,6 +20,7 @@
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigurableComponent;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Connection;
@@ -79,20 +80,41 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
private ComponentStateDAO componentStateDAO;
private ProcessorNode locateProcessor(final String processorId) {
+ return locateProcessor(processorId, false);
+ }
+
+ ProcessorNode locateProcessor(final String processorId, final boolean includeConnectorManaged) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
final ProcessorNode processor = rootGroup.findProcessor(processorId);
-
- if (processor == null) {
- throw new ResourceNotFoundException(String.format("Unable to find processor with id '%s'.", processorId));
- } else {
+ if (processor != null) {
return processor;
}
+
+ final ProcessorNode globalProcessor = flowController.getFlowManager().getProcessorNode(processorId);
+ if (globalProcessor != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(globalProcessor.getProcessGroup(), processorId);
+ }
+ return globalProcessor;
+ }
+
+ throw new ResourceNotFoundException(String.format("Unable to find processor with id '%s'.", processorId));
}
@Override
public boolean hasProcessor(String id) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findProcessor(id) != null;
+ if (rootGroup.findProcessor(id) != null) {
+ return true;
+ }
+
+ final ProcessorNode globalProcessor = flowController.getFlowManager().getProcessorNode(id);
+ if (globalProcessor == null) {
+ return false;
+ }
+ return globalProcessor.getProcessGroup().findOwningConnector()
+ .map(owningConnector -> owningConnector.getCurrentState() == ConnectorState.TROUBLESHOOTING)
+ .orElse(false);
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
index f8888154defa..4c8bfae96014 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardRemoteProcessGroupDAO.java
@@ -17,6 +17,10 @@
package org.apache.nifi.web.dao.impl;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
+import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.connectable.Position;
@@ -52,20 +56,53 @@ public class StandardRemoteProcessGroupDAO extends ComponentDAO implements Remot
private ComponentStateDAO componentStateDAO;
private RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId) {
- final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- final RemoteProcessGroup remoteProcessGroup = rootGroup.findRemoteProcessGroup(remoteProcessGroupId);
+ return locateRemoteProcessGroup(remoteProcessGroupId, false);
+ }
- if (remoteProcessGroup == null) {
- throw new ResourceNotFoundException(String.format("Unable to find remote process group with id '%s'.", remoteProcessGroupId));
- } else {
+ RemoteProcessGroup locateRemoteProcessGroup(final String remoteProcessGroupId, final boolean includeConnectorManaged) {
+ final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
+ RemoteProcessGroup remoteProcessGroup = rootGroup.findRemoteProcessGroup(remoteProcessGroupId);
+ if (remoteProcessGroup != null) {
return remoteProcessGroup;
}
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext == null) {
+ continue;
+ }
+
+ remoteProcessGroup = flowContext.getManagedProcessGroup().findRemoteProcessGroup(remoteProcessGroupId);
+ if (remoteProcessGroup != null) {
+ if (!includeConnectorManaged) {
+ verifyAccessibleForComponentOperation(remoteProcessGroup.getProcessGroup(), remoteProcessGroupId);
+ }
+ return remoteProcessGroup;
+ }
+ }
+
+ throw new ResourceNotFoundException(String.format("Unable to find remote process group with id '%s'.", remoteProcessGroupId));
}
@Override
public boolean hasRemoteProcessGroup(String remoteProcessGroupId) {
final ProcessGroup rootGroup = flowController.getFlowManager().getRootGroup();
- return rootGroup.findRemoteProcessGroup(remoteProcessGroupId) != null;
+ if (rootGroup.findRemoteProcessGroup(remoteProcessGroupId) != null) {
+ return true;
+ }
+
+ for (final ConnectorNode connector : flowController.getConnectorRepository().getConnectors(ConnectorSyncMode.LOCAL_ONLY)) {
+ if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
+ continue;
+ }
+
+ final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
+ if (flowContext != null && flowContext.getManagedProcessGroup().findRemoteProcessGroup(remoteProcessGroupId) != null) {
+ return true;
+ }
+ }
+
+ return false;
}
/**
@@ -108,9 +145,7 @@ public RemoteProcessGroup createRemoteProcessGroup(String groupId, RemoteProcess
*/
@Override
public RemoteProcessGroup getRemoteProcessGroup(String remoteProcessGroupId) {
- final RemoteProcessGroup remoteProcessGroup = locateRemoteProcessGroup(remoteProcessGroupId);
-
- return remoteProcessGroup;
+ return locateRemoteProcessGroup(remoteProcessGroupId);
}
/**
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/authorization/StandardAuthorizableLookupTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/authorization/StandardAuthorizableLookupTest.java
index 9a7bfc6cdced..b44bc1c13a7d 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/authorization/StandardAuthorizableLookupTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/authorization/StandardAuthorizableLookupTest.java
@@ -32,6 +32,7 @@
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ConnectionDAO;
+import org.apache.nifi.web.dao.ConnectorManagedComponentLookup;
import org.apache.nifi.web.dao.FlowAnalysisRuleDAO;
import org.apache.nifi.web.dao.FlowRegistryDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
@@ -111,13 +112,13 @@ void testGetAuthorizableFromResourceFlowAnalysisRule() {
}
@Test
- void testGetConnectionWithoutIncludeConnectorManaged() {
+ void testGetConnectionResolvesThroughConnectionDAO() {
final StandardAuthorizableLookup lookup = getLookup();
final ConnectionDAO connectionDAO = mock(ConnectionDAO.class);
final Connection connection = mock(Connection.class);
final Connectable sourceConnectable = mock(Connectable.class);
- when(connectionDAO.getConnection(eq(COMPONENT_ID), eq(false))).thenReturn(connection);
+ when(connectionDAO.getConnection(eq(COMPONENT_ID))).thenReturn(connection);
when(connection.getSource()).thenReturn(sourceConnectable);
when(connection.getDestination()).thenReturn(sourceConnectable);
when(connection.getSourceAuthorizable()).thenReturn(sourceConnectable);
@@ -126,90 +127,56 @@ void testGetConnectionWithoutIncludeConnectorManaged() {
final ConnectionAuthorizable result = lookup.getConnection(COMPONENT_ID);
assertNotNull(result);
- verify(connectionDAO).getConnection(eq(COMPONENT_ID), eq(false));
+ verify(connectionDAO).getConnection(eq(COMPONENT_ID));
}
@Test
- void testGetConnectionWithIncludeConnectorManagedTrue() {
+ void testGetConnectionThroughConnectorManagedFlowFacade() {
final StandardAuthorizableLookup lookup = getLookup();
- final ConnectionDAO connectionDAO = mock(ConnectionDAO.class);
- final Connection connection = mock(Connection.class);
- final Connectable sourceConnectable = mock(Connectable.class);
-
- when(connectionDAO.getConnection(eq(COMPONENT_ID), eq(true))).thenReturn(connection);
- when(connection.getSource()).thenReturn(sourceConnectable);
- when(connection.getDestination()).thenReturn(sourceConnectable);
- when(connection.getSourceAuthorizable()).thenReturn(sourceConnectable);
- lookup.setConnectionDAO(connectionDAO);
-
- final ConnectionAuthorizable result = lookup.getConnection(COMPONENT_ID, true);
-
- assertNotNull(result);
- verify(connectionDAO).getConnection(eq(COMPONENT_ID), eq(true));
- }
-
- @Test
- void testGetConnectionWithIncludeConnectorManagedFalse() {
- final StandardAuthorizableLookup lookup = getLookup();
- final ConnectionDAO connectionDAO = mock(ConnectionDAO.class);
+ final ConnectorManagedComponentLookup connectorManagedComponentLookup = mock(ConnectorManagedComponentLookup.class);
final Connection connection = mock(Connection.class);
final Connectable sourceConnectable = mock(Connectable.class);
- when(connectionDAO.getConnection(eq(COMPONENT_ID), eq(false))).thenReturn(connection);
+ when(connectorManagedComponentLookup.getConnection(eq(COMPONENT_ID))).thenReturn(connection);
when(connection.getSource()).thenReturn(sourceConnectable);
when(connection.getDestination()).thenReturn(sourceConnectable);
when(connection.getSourceAuthorizable()).thenReturn(sourceConnectable);
- lookup.setConnectionDAO(connectionDAO);
+ lookup.setConnectorManagedComponentLookup(connectorManagedComponentLookup);
- final ConnectionAuthorizable result = lookup.getConnection(COMPONENT_ID, false);
+ final ConnectionAuthorizable result = lookup.forConnectorManagedFlow().getConnection(COMPONENT_ID);
assertNotNull(result);
- verify(connectionDAO).getConnection(eq(COMPONENT_ID), eq(false));
+ verify(connectorManagedComponentLookup).getConnection(eq(COMPONENT_ID));
}
@Test
- void testGetProcessGroupWithoutIncludeConnectorManaged() {
+ void testGetProcessGroupResolvesThroughProcessGroupDAO() {
final StandardAuthorizableLookup lookup = getLookup();
final ProcessGroupDAO processGroupDAO = mock(ProcessGroupDAO.class);
final ProcessGroup processGroup = mock(ProcessGroup.class);
- when(processGroupDAO.getProcessGroup(eq(COMPONENT_ID), eq(false))).thenReturn(processGroup);
+ when(processGroupDAO.getProcessGroup(eq(COMPONENT_ID))).thenReturn(processGroup);
lookup.setProcessGroupDAO(processGroupDAO);
final ProcessGroupAuthorizable result = lookup.getProcessGroup(COMPONENT_ID);
assertNotNull(result);
- verify(processGroupDAO).getProcessGroup(eq(COMPONENT_ID), eq(false));
+ verify(processGroupDAO).getProcessGroup(eq(COMPONENT_ID));
}
@Test
- void testGetProcessGroupWithIncludeConnectorManagedTrue() {
+ void testGetProcessGroupThroughConnectorManagedFlowFacade() {
final StandardAuthorizableLookup lookup = getLookup();
- final ProcessGroupDAO processGroupDAO = mock(ProcessGroupDAO.class);
+ final ConnectorManagedComponentLookup connectorManagedComponentLookup = mock(ConnectorManagedComponentLookup.class);
final ProcessGroup processGroup = mock(ProcessGroup.class);
- when(processGroupDAO.getProcessGroup(eq(COMPONENT_ID), eq(true))).thenReturn(processGroup);
- lookup.setProcessGroupDAO(processGroupDAO);
-
- final ProcessGroupAuthorizable result = lookup.getProcessGroup(COMPONENT_ID, true);
-
- assertNotNull(result);
- verify(processGroupDAO).getProcessGroup(eq(COMPONENT_ID), eq(true));
- }
-
- @Test
- void testGetProcessGroupWithIncludeConnectorManagedFalse() {
- final StandardAuthorizableLookup lookup = getLookup();
- final ProcessGroupDAO processGroupDAO = mock(ProcessGroupDAO.class);
- final ProcessGroup processGroup = mock(ProcessGroup.class);
-
- when(processGroupDAO.getProcessGroup(eq(COMPONENT_ID), eq(false))).thenReturn(processGroup);
- lookup.setProcessGroupDAO(processGroupDAO);
+ when(connectorManagedComponentLookup.getProcessGroup(eq(COMPONENT_ID))).thenReturn(processGroup);
+ lookup.setConnectorManagedComponentLookup(connectorManagedComponentLookup);
- final ProcessGroupAuthorizable result = lookup.getProcessGroup(COMPONENT_ID, false);
+ final ProcessGroupAuthorizable result = lookup.forConnectorManagedFlow().getProcessGroup(COMPONENT_ID);
assertNotNull(result);
- verify(processGroupDAO).getProcessGroup(eq(COMPONENT_ID), eq(false));
+ verify(connectorManagedComponentLookup).getProcessGroup(eq(COMPONENT_ID));
}
private StandardAuthorizableLookup getLookup() {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
index c1cba635aecf..0a8076239901 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/StandardNiFiServiceFacadeTest.java
@@ -130,6 +130,7 @@
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ComponentStateDAO;
import org.apache.nifi.web.dao.ConnectorDAO;
+import org.apache.nifi.web.dao.ConnectorManagedComponentLookup;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.dao.UserDAO;
@@ -144,6 +145,7 @@
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
@@ -224,6 +226,7 @@ public class StandardNiFiServiceFacadeTest {
private Authorizer authorizer;
private FlowController flowController;
private ProcessGroupDAO processGroupDAO;
+ private ConnectorManagedComponentLookup connectorManagedComponentLookup;
private RuleViolationsManager ruleViolationsManager;
@BeforeEach
@@ -250,7 +253,7 @@ public void setUp() throws Exception {
// authorizable lookup
final AuthorizableLookup authorizableLookup = mock(AuthorizableLookup.class);
- when(authorizableLookup.getProcessor(Mockito.anyString())).then(getProcessorInvocation -> {
+ final Answer processorLookupAnswer = getProcessorInvocation -> {
final String processorId = getProcessorInvocation.getArgument(0);
// processor-2 is no longer part of the flow
@@ -277,7 +280,8 @@ public Resource getResource() {
});
return componentAuthorizable;
- });
+ };
+ when(authorizableLookup.getProcessor(Mockito.anyString())).then(processorLookupAnswer);
// authorizer
authorizer = mock(Authorizer.class);
@@ -323,6 +327,7 @@ public Resource getResource() {
processGroupDAO = mock(ProcessGroupDAO.class, Answers.RETURNS_DEEP_STUBS);
ruleViolationsManager = mock(RuleViolationsManager.class);
+ connectorManagedComponentLookup = mock(ConnectorManagedComponentLookup.class);
serviceFacade = new StandardNiFiServiceFacade();
serviceFacade.setAuditService(auditService);
@@ -333,6 +338,7 @@ public Resource getResource() {
serviceFacade.setControllerFacade(controllerFacade);
serviceFacade.setProcessGroupDAO(processGroupDAO);
serviceFacade.setRuleViolationsManager(ruleViolationsManager);
+ serviceFacade.setConnectorManagedComponentLookup(connectorManagedComponentLookup);
}
@@ -1114,7 +1120,7 @@ public void testGetBulletinBoard_BulletinAuthorizedViaOwningProcessGroupWhenAuth
final ProcessorNode approvingProcessor = approvingProcessorNode();
when(processGroup.findProcessor(PROCESSOR_ID_2)).thenReturn(approvingProcessor);
- when(processGroupDAO.getProcessGroup(groupId, true)).thenReturn(processGroup);
+ when(connectorManagedComponentLookup.getProcessGroup(groupId)).thenReturn(processGroup);
final StandardNiFiServiceFacade serviceFacadeSpy = spy(serviceFacade);
final MockTestBulletinRepository bulletinRepository = new MockTestBulletinRepository();
@@ -1131,7 +1137,7 @@ public void testGetBulletinBoard_BulletinAuthorizedViaOwningProcessGroupWhenAuth
assertEquals(1, board.getBulletins().size());
assertTrue(board.getBulletins().get(0).getCanRead(),
"Bulletin board canRead should be true when the source is resolved via the owning ProcessGroup");
- verify(processGroupDAO).getProcessGroup(groupId, true);
+ verify(connectorManagedComponentLookup).getProcessGroup(groupId);
}
private ProcessGroupEntity invokeUpdateProcessGroupWithBulletin(final String groupId, final ProcessGroup processGroup,
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java
index db8a22f34e9c..20df04a91601 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardConnectionDAOTest.java
@@ -18,6 +18,7 @@
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.connectable.Connection;
@@ -34,6 +35,7 @@
import org.mockito.quality.Strictness;
import java.util.List;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -46,6 +48,7 @@
class StandardConnectionDAOTest {
private StandardConnectionDAO connectionDAO;
+ private StandardConnectorManagedComponentLookup connectorManagedComponentLookup;
@Mock
private FlowController flowController;
@@ -83,6 +86,9 @@ void setUp() {
connectionDAO = new StandardConnectionDAO();
connectionDAO.setFlowController(flowController);
+ connectorManagedComponentLookup = new StandardConnectorManagedComponentLookup();
+ connectorManagedComponentLookup.setConnectionDAO(connectionDAO);
+
when(flowController.getFlowManager()).thenReturn(flowManager);
when(flowManager.getRootGroup()).thenReturn(rootGroup);
when(flowController.getConnectorRepository()).thenReturn(connectorRepository);
@@ -92,13 +98,18 @@ void setUp() {
when(rootGroup.findConnection(CONNECTOR_CONNECTION_ID)).thenReturn(null);
when(rootGroup.findConnection(NON_EXISTENT_ID)).thenReturn(null);
- // Setup connector managed group
when(connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY)).thenReturn(List.of(connectorNode));
when(connectorNode.getActiveFlowContext()).thenReturn(frameworkFlowContext);
when(frameworkFlowContext.getManagedProcessGroup()).thenReturn(connectorManagedGroup);
when(connectorManagedGroup.findConnection(CONNECTOR_CONNECTION_ID)).thenReturn(connectorConnection);
when(connectorManagedGroup.findConnection(ROOT_CONNECTION_ID)).thenReturn(null);
when(connectorManagedGroup.findConnection(NON_EXISTENT_ID)).thenReturn(null);
+
+ // Attach the connector-managed Connection to a ProcessGroup owned by a Connector that is not in
+ // Troubleshooting mode so that access checks behave as they would for a real connector-managed component.
+ when(connectorConnection.getProcessGroup()).thenReturn(connectorManagedGroup);
+ when(connectorManagedGroup.findOwningConnector()).thenReturn(Optional.of(connectorNode));
+ when(connectorNode.getCurrentState()).thenReturn(ConnectorState.STOPPED);
}
@Test
@@ -109,44 +120,38 @@ void testGetConnectionFromRootGroup() {
}
@Test
- void testGetConnectionFromRootGroupWithIncludeConnectorManagedFalse() {
- final Connection result = connectionDAO.getConnection(ROOT_CONNECTION_ID, false);
-
- assertEquals(rootConnection, result);
- }
-
- @Test
- void testGetConnectionFromRootGroupWithIncludeConnectorManagedTrue() {
- final Connection result = connectionDAO.getConnection(ROOT_CONNECTION_ID, true);
+ void testConnectorManagedLookupReturnsRootConnection() {
+ final Connection result = connectorManagedComponentLookup.getConnection(ROOT_CONNECTION_ID);
assertEquals(rootConnection, result);
}
@Test
- void testGetConnectionFromConnectorManagedGroupWithIncludeConnectorManagedTrue() {
- final Connection result = connectionDAO.getConnection(CONNECTOR_CONNECTION_ID, true);
+ void testConnectorManagedLookupReturnsConnectorConnectionRegardlessOfConnectorState() {
+ final Connection result = connectorManagedComponentLookup.getConnection(CONNECTOR_CONNECTION_ID);
assertEquals(connectorConnection, result);
}
@Test
- void testGetConnectionFromConnectorManagedGroupWithIncludeConnectorManagedFalseThrows() {
- assertThrows(ResourceNotFoundException.class, () ->
- connectionDAO.getConnection(CONNECTOR_CONNECTION_ID, false)
+ void testGetConnectionOnConnectorManagedConnectionThrowsWhenConnectorNotTroubleshooting() {
+ assertThrows(IllegalStateException.class, () ->
+ connectionDAO.getConnection(CONNECTOR_CONNECTION_ID)
);
}
@Test
- void testGetConnectionWithDefaultDoesNotFindConnectorManagedConnection() {
- assertThrows(ResourceNotFoundException.class, () ->
- connectionDAO.getConnection(CONNECTOR_CONNECTION_ID)
- );
+ void testGetConnectionFromConnectorManagedConnectionInTroubleshootingReturnsConnection() {
+ when(connectorNode.getCurrentState()).thenReturn(ConnectorState.TROUBLESHOOTING);
+
+ final Connection result = connectionDAO.getConnection(CONNECTOR_CONNECTION_ID);
+ assertEquals(connectorConnection, result);
}
@Test
void testGetConnectionWithNonExistentIdThrows() {
assertThrows(ResourceNotFoundException.class, () ->
- connectionDAO.getConnection(NON_EXISTENT_ID, true)
+ connectorManagedComponentLookup.getConnection(NON_EXISTENT_ID)
);
}
@@ -172,7 +177,7 @@ void testGetConnectionFromConnectorWithNullActiveFlowContext() {
when(connectorNode.getActiveFlowContext()).thenReturn(null);
assertThrows(ResourceNotFoundException.class, () ->
- connectionDAO.getConnection(CONNECTOR_CONNECTION_ID, true)
+ connectorManagedComponentLookup.getConnection(CONNECTOR_CONNECTION_ID)
);
}
@@ -190,7 +195,7 @@ void testGetConnectionWithMultipleConnectors() {
when(flowContext2.getManagedProcessGroup()).thenReturn(managedGroup2);
when(managedGroup2.findConnection(secondConnectorConnectionId)).thenReturn(connectionInSecondConnector);
- final Connection result = connectionDAO.getConnection(secondConnectorConnectionId, true);
+ final Connection result = connectorManagedComponentLookup.getConnection(secondConnectorConnectionId);
assertEquals(connectionInSecondConnector, result);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAOTest.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAOTest.java
index cf5441f199ab..9e6767f3239d 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAOTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/test/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAOTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.web.dao.impl;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorState;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.groups.ProcessGroup;
@@ -28,6 +30,8 @@
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.util.Optional;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;
@@ -37,6 +41,7 @@
class StandardProcessGroupDAOTest {
private StandardProcessGroupDAO processGroupDAO;
+ private StandardConnectorManagedComponentLookup connectorManagedComponentLookup;
@Mock
private FlowController flowController;
@@ -50,6 +55,9 @@ class StandardProcessGroupDAOTest {
@Mock
private ProcessGroup connectorManagedGroup;
+ @Mock
+ private ConnectorNode owningConnector;
+
private static final String ROOT_GROUP_ID = "root-group-id";
private static final String CONNECTOR_GROUP_ID = "connector-group-id";
private static final String NON_EXISTENT_ID = "non-existent-id";
@@ -59,17 +67,21 @@ void setUp() {
processGroupDAO = new StandardProcessGroupDAO();
processGroupDAO.setFlowController(flowController);
+ connectorManagedComponentLookup = new StandardConnectorManagedComponentLookup();
+ connectorManagedComponentLookup.setFlowController(flowController);
+
when(flowController.getFlowManager()).thenReturn(flowManager);
- // Setup root group lookup (non-connector managed)
when(flowManager.getGroup(ROOT_GROUP_ID, null)).thenReturn(rootGroup);
when(flowManager.getGroup(CONNECTOR_GROUP_ID, null)).thenReturn(null);
when(flowManager.getGroup(NON_EXISTENT_ID, null)).thenReturn(null);
- // Setup connector-managed group lookup (includes all groups)
when(flowManager.getGroup(ROOT_GROUP_ID)).thenReturn(rootGroup);
when(flowManager.getGroup(CONNECTOR_GROUP_ID)).thenReturn(connectorManagedGroup);
when(flowManager.getGroup(NON_EXISTENT_ID)).thenReturn(null);
+
+ when(connectorManagedGroup.findOwningConnector()).thenReturn(Optional.of(owningConnector));
+ when(owningConnector.getCurrentState()).thenReturn(ConnectorState.STOPPED);
}
@Test
@@ -80,51 +92,45 @@ void testGetProcessGroupFromRootHierarchy() {
}
@Test
- void testGetProcessGroupFromRootHierarchyWithIncludeConnectorManagedFalse() {
- final ProcessGroup result = processGroupDAO.getProcessGroup(ROOT_GROUP_ID, false);
-
- assertEquals(rootGroup, result);
- }
-
- @Test
- void testGetProcessGroupFromRootHierarchyWithIncludeConnectorManagedTrue() {
- final ProcessGroup result = processGroupDAO.getProcessGroup(ROOT_GROUP_ID, true);
+ void testConnectorManagedLookupReturnsRootGroup() {
+ final ProcessGroup result = connectorManagedComponentLookup.getProcessGroup(ROOT_GROUP_ID);
assertEquals(rootGroup, result);
}
@Test
- void testGetProcessGroupFromConnectorManagedWithIncludeConnectorManagedTrue() {
- final ProcessGroup result = processGroupDAO.getProcessGroup(CONNECTOR_GROUP_ID, true);
+ void testConnectorManagedLookupReturnsConnectorManagedGroupRegardlessOfConnectorState() {
+ final ProcessGroup result = connectorManagedComponentLookup.getProcessGroup(CONNECTOR_GROUP_ID);
assertEquals(connectorManagedGroup, result);
}
@Test
- void testGetProcessGroupFromConnectorManagedWithIncludeConnectorManagedFalseThrows() {
- assertThrows(ResourceNotFoundException.class, () ->
- processGroupDAO.getProcessGroup(CONNECTOR_GROUP_ID, false)
+ void testGetProcessGroupOnConnectorManagedGroupThrowsWhenConnectorNotTroubleshooting() {
+ assertThrows(IllegalStateException.class, () ->
+ processGroupDAO.getProcessGroup(CONNECTOR_GROUP_ID)
);
}
@Test
- void testGetProcessGroupWithDefaultDoesNotFindConnectorManagedGroup() {
- assertThrows(ResourceNotFoundException.class, () ->
- processGroupDAO.getProcessGroup(CONNECTOR_GROUP_ID)
- );
+ void testGetProcessGroupFromConnectorManagedWhenInTroubleshootingReturnsGroup() {
+ when(owningConnector.getCurrentState()).thenReturn(ConnectorState.TROUBLESHOOTING);
+
+ final ProcessGroup result = processGroupDAO.getProcessGroup(CONNECTOR_GROUP_ID);
+ assertEquals(connectorManagedGroup, result);
}
@Test
- void testGetProcessGroupWithNonExistentIdThrows() {
+ void testGetProcessGroupWithNonExistentIdThrowsThroughConnectorManagedLookup() {
assertThrows(ResourceNotFoundException.class, () ->
- processGroupDAO.getProcessGroup(NON_EXISTENT_ID, true)
+ connectorManagedComponentLookup.getProcessGroup(NON_EXISTENT_ID)
);
}
@Test
- void testGetProcessGroupWithNonExistentIdAndIncludeConnectorManagedFalseThrows() {
+ void testGetProcessGroupWithNonExistentIdThrows() {
assertThrows(ResourceNotFoundException.class, () ->
- processGroupDAO.getProcessGroup(NON_EXISTENT_ID, false)
+ processGroupDAO.getProcessGroup(NON_EXISTENT_ID)
);
}
}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/AssetConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/AssetConnector.java
index 2e927bb038a7..4b710ae53bc5 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/AssetConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/AssetConnector.java
@@ -63,6 +63,12 @@ public VersionedExternalFlow getInitialFlow() {
return null;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // This Connector does not manage a flow; there is no authoritative Active flow.
+ return null;
+ }
+
@Override
public void prepareForUpdate(final FlowContext workingContext, final FlowContext activeContext) throws FlowUpdateException {
// No-op: this connector does not manipulate the flow.
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java
index f15d668ef51f..9106e9eeb5fd 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/BundleResolutionConnector.java
@@ -110,6 +110,13 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // After configuration, the authoritative flow is the one produced by createFlowWithBundleScenarios,
+ // which is what applyUpdate installs into the managed Process Group.
+ return createFlowWithBundleScenarios();
+ }
+
private VersionedExternalFlow createFlowWithBundleScenarios() {
final VersionedProcessGroup group = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Bundle Resolution Flow");
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
index bb767219154d..c5ac49bb9afc 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/CalculateConnector.java
@@ -172,6 +172,13 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // The flow structure does not depend on the Connector's configuration; applyUpdate only invokes a
+ // processor method and writes output to a file, it does not modify the flow itself.
+ return getInitialFlow();
+ }
+
@Override
public List getConfigurationSteps() {
return List.of(CALCULATION_STEP);
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
index 81c6af171b13..9eddbf597e02 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ComponentLifecycleConnector.java
@@ -39,7 +39,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
/**
* Test Connector designed to verify the complete component lifecycle.
@@ -54,6 +53,7 @@
public class ComponentLifecycleConnector extends AbstractConnector {
private static final Bundle SYSTEM_TEST_EXTENSIONS_BUNDLE = new Bundle("org.apache.nifi", "nifi-system-test-extensions-nar", "2.8.0-SNAPSHOT");
+ private static final String ROOT_GROUP_ID = "component-lifecycle-root-group";
@Override
protected void onStepConfigured(final String stepName, final FlowContext workingContext) {
@@ -67,8 +67,14 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // This Connector's flow is fully determined statically and does not depend on runtime configuration.
+ return getInitialFlow();
+ }
+
private VersionedProcessGroup createRootGroup() {
- final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(UUID.randomUUID().toString(), "Component Lifecycle Root");
+ final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(ROOT_GROUP_ID, "Component Lifecycle Root");
rootGroup.setPosition(new Position(0, 0));
rootGroup.setRemoteProcessGroups(new HashSet<>());
rootGroup.setScheduledState(ScheduledState.ENABLED);
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
index 1c50941c8371..8d331bab4306 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/DataQueuingConnector.java
@@ -63,6 +63,12 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // This Connector's flow is static and does not depend on runtime configuration.
+ return getInitialFlow();
+ }
+
@Override
public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) {
return List.of();
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
index 8c7f680d86ba..9abcbf87c59a 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/GatedDataQueuingConnector.java
@@ -94,6 +94,21 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // The authoritative flow is the initial flow with the currently configured Gate File Path applied to the
+ // TerminateFlowFile processor. This mirrors the behavior of applyUpdate so that exiting Troubleshooting
+ // restores a flow equivalent to re-applying the active configuration.
+ final VersionedExternalFlow flow = getInitialFlow();
+ final String gateFilePath = activeFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, GATE_FILE_PATH).getValue();
+ if (gateFilePath != null) {
+ VersionedFlowUtils.findProcessor(flow.getFlowContents(), processor -> processor.getType().endsWith("TerminateFlowFile"))
+ .ifPresent(processor -> processor.getProperties().put("Gate File", gateFilePath));
+ }
+
+ return flow;
+ }
+
@Override
public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) {
return List.of();
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NestedProcessGroupConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NestedProcessGroupConnector.java
index e3903a38f1de..1070e28dc8c3 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NestedProcessGroupConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NestedProcessGroupConnector.java
@@ -58,6 +58,12 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // This Connector's flow structure is static and does not depend on runtime configuration.
+ return getInitialFlow();
+ }
+
@Override
public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) {
return List.of();
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NopConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NopConnector.java
index 3d28240ca256..2409c3200465 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NopConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/NopConnector.java
@@ -100,6 +100,12 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // No-op Connector: the authoritative flow is always the initial flow and does not depend on configuration.
+ return getInitialFlow();
+ }
+
@Override
public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) {
return List.of(new ConfigVerificationResult.Builder()
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java
index 9d160a11f1c2..fc2047c444dc 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/ParameterContextConnector.java
@@ -134,6 +134,21 @@ public VersionedExternalFlow getInitialFlow() {
return createEmptyFlow();
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // The authoritative Active flow is the fully-constructed flow produced by applying the current
+ // configuration values. Before configuration has been applied, fall back to the initial empty flow.
+ final String sensitiveValue = activeFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, SENSITIVE_VALUE).getValue();
+ final String assetFilePath = activeFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, ASSET_FILE).getValue();
+ if (sensitiveValue == null || assetFilePath == null) {
+ return createEmptyFlow();
+ }
+
+ final String sensitiveOutputFile = activeFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, SENSITIVE_OUTPUT_FILE).getValue();
+ final String assetOutputFile = activeFlowContext.getConfigurationContext().getProperty(CONFIG_STEP, ASSET_OUTPUT_FILE).getValue();
+ return createFlow(sensitiveValue, assetFilePath, sensitiveOutputFile, assetOutputFile);
+ }
+
private VersionedExternalFlow createEmptyFlow() {
final VersionedProcessGroup rootGroup = VersionedFlowUtils.createProcessGroup(ROOT_GROUP_ID, "Parameter Context Test Flow");
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java
index 441f4bc3d47f..61d9eabb8958 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/connectors/tests/system/SelectiveDropConnector.java
@@ -83,6 +83,12 @@ public VersionedExternalFlow getInitialFlow() {
return flow;
}
+ @Override
+ public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
+ // This Connector's flow is fully determined statically and does not depend on runtime configuration.
+ return getInitialFlow();
+ }
+
@Override
public List verifyConfigurationStep(final String stepName, final Map propertyValueOverrides, final FlowContext flowContext) {
return List.of();
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 8960e202ce3e..02c020df9a1e 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -426,8 +426,59 @@ public void stopConnectors() throws NiFiClientException, IOException, Interrupte
final ConnectorsEntity connectorsEntity = nifiClient.getFlowClient().getConnectors();
for (final ConnectorEntity connector : connectorsEntity.getConnectors()) {
connector.setDisconnectedNodeAcknowledged(true);
- getConnectorClient().stopConnector(connector);
- waitForConnectorStopped(connector.getId());
+ final String state = connector.getComponent() == null ? null : connector.getComponent().getState();
+
+ if (ConnectorState.TROUBLESHOOTING.name().equals(state)) {
+ final String managedGroupId = connector.getComponent().getManagedProcessGroupId();
+ if (managedGroupId != null) {
+ try {
+ stopProcessGroupComponents(managedGroupId);
+ disableControllerServices(managedGroupId, true);
+ emptyConnectorManagedQueues(connector.getId(), managedGroupId);
+ } catch (final Exception stopException) {
+ logger.warn("Failed to prepare managed Process Group [{}] for Connector [{}] during teardown",
+ managedGroupId, connector.getId(), stopException);
+ }
+ }
+
+ try {
+ endTroubleshooting(connector.getId());
+ } catch (final Exception endException) {
+ logger.warn("Failed to end Troubleshooting for Connector [{}] during teardown", connector.getId(), endException);
+ }
+
+ continue;
+ }
+
+ try {
+ getConnectorClient().stopConnector(connector);
+ waitForConnectorStopped(connector.getId());
+ } catch (final Exception stopException) {
+ logger.warn("Failed to stop Connector [{}] during teardown", connector.getId(), stopException);
+ }
+ }
+ }
+
+ private void emptyConnectorManagedQueues(final String connectorId, final String groupId) throws NiFiClientException, IOException {
+ final ProcessGroupFlowEntity flowEntity = getConnectorClient().getFlow(connectorId, groupId);
+ if (flowEntity == null || flowEntity.getProcessGroupFlow() == null || flowEntity.getProcessGroupFlow().getFlow() == null) {
+ return;
+ }
+
+ final FlowDTO flow = flowEntity.getProcessGroupFlow().getFlow();
+ if (flow.getConnections() != null) {
+ for (final ConnectionEntity connection : flow.getConnections()) {
+ try {
+ emptyQueue(connection.getId());
+ } catch (final Exception ignored) {
+ }
+ }
+ }
+
+ if (flow.getProcessGroups() != null) {
+ for (final ProcessGroupEntity child : flow.getProcessGroups()) {
+ emptyConnectorManagedQueues(connectorId, child.getId());
+ }
}
}
@@ -490,6 +541,21 @@ public void waitForConnectorDraining(final String connectorId) throws NiFiClient
waitForConnectorState(connectorId, ConnectorState.DRAINING);
}
+ public ConnectorEntity enterTroubleshooting(final String connectorId) throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity entity = getConnectorClient().getConnector(connectorId);
+ entity.setDisconnectedNodeAcknowledged(true);
+ final ConnectorEntity result = getConnectorClient().enterTroubleshooting(entity);
+ waitForConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+ return result;
+ }
+
+ public ConnectorEntity endTroubleshooting(final String connectorId) throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity entity = getConnectorClient().getConnector(connectorId);
+ final ConnectorEntity result = getConnectorClient().endTroubleshooting(connectorId, entity.getRevision().getClientId(), entity.getRevision().getVersion());
+ waitForConnectorStopped(connectorId);
+ return result;
+ }
+
public ParameterProviderEntity createParameterProvider(final String simpleTypeName) throws NiFiClientException, IOException {
return createParameterProvider(NiFiSystemIT.TEST_PARAM_PROVIDERS_PACKAGE + "." + simpleTypeName, NiFiSystemIT.NIFI_GROUP_ID, NiFiSystemIT.TEST_EXTENSIONS_ARTIFACT_ID, nifiVersion);
}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorTroubleshootingIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorTroubleshootingIT.java
new file mode 100644
index 000000000000..7c73e0800ee8
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ClusteredConnectorTroubleshootingIT.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import org.apache.nifi.tests.system.NiFiInstanceFactory;
+
+/**
+ * Clustered variant of {@link ConnectorTroubleshootingIT}. All Troubleshooting lifecycle tests defined on the parent
+ * class are re-executed against a two-node NiFi cluster so that the cluster-replication and node-restart paths are
+ * exercised in addition to the standalone paths.
+ *
+ * This class deliberately does not redefine any of the parent's test methods. The verification of restart-while-in-
+ * Troubleshooting and authoritative-flow restoration that already exists in
+ * {@link ConnectorTroubleshootingIT#testConfigurationAndAuthoritativeFlowRestoredAfterTroubleshootingRestart()} (and
+ * the related restart tests) is the clustered coverage the user requested. Adding the override of
+ * {@link #getInstanceFactory()} below is sufficient to run all of those scenarios on a two-node cluster.
+ */
+public class ClusteredConnectorTroubleshootingIT extends ConnectorTroubleshootingIT {
+
+ @Override
+ public NiFiInstanceFactory getInstanceFactory() {
+ return createTwoNodeInstanceFactory();
+ }
+}
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
index 3dec256e24f5..38a0d1c1fd7f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorCrudIT.java
@@ -17,7 +17,8 @@
package org.apache.nifi.tests.system.connectors;
-import jakarta.ws.rs.NotFoundException;
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.connector.BundleCompatibility;
import org.apache.nifi.components.connector.ConnectorState;
@@ -194,12 +195,8 @@ public void testProcessGroupAccessibility() throws NiFiClientException, IOExcept
assertNotNull(connectorFlow);
assertEquals(managedProcessGroupId, connectorFlow.getId());
- try {
- getNifiClient().getFlowClient().getProcessGroup(managedProcessGroupId);
- fail("Was able to retrieve connector-managed process group via FlowClient");
- } catch (final NiFiClientException e) {
- assertInstanceOf(NotFoundException.class, e.getCause());
- }
+ assertConnectorManagedAccessRejected(() -> getNifiClient().getFlowClient().getProcessGroup(managedProcessGroupId),
+ "Was able to retrieve connector-managed process group via FlowClient");
final Set childGroups = connectorFlow.getFlow().getProcessGroups();
assertEquals(1, childGroups.size(), "Expected exactly one child process group");
@@ -212,14 +209,25 @@ public void testProcessGroupAccessibility() throws NiFiClientException, IOExcept
assertNotNull(childFlowEntity);
assertEquals(childGroupId, childFlowEntity.getProcessGroupFlow().getId());
+ assertConnectorManagedAccessRejected(() -> getNifiClient().getFlowClient().getProcessGroup(childGroupId),
+ "Was able to retrieve child process group of connector-managed flow via FlowClient");
+ }
+
+ private void assertConnectorManagedAccessRejected(final FlowClientCall call, final String failureMessage) throws IOException {
try {
- getNifiClient().getFlowClient().getProcessGroup(childGroupId);
- fail("Was able to retrieve child process group of connector-managed flow via FlowClient");
+ call.invoke();
+ fail(failureMessage);
} catch (final NiFiClientException e) {
- assertInstanceOf(NotFoundException.class, e.getCause());
+ final WebApplicationException cause = assertInstanceOf(WebApplicationException.class, e.getCause());
+ assertEquals(Response.Status.CONFLICT.getStatusCode(), cause.getResponse().getStatus());
}
}
+ @FunctionalInterface
+ private interface FlowClientCall {
+ void invoke() throws NiFiClientException, IOException;
+ }
+
@Test
public void testBundleResolutionRequireExactBundle() throws NiFiClientException, IOException, InterruptedException {
final ConnectorAndProcessor connectorAndProcessor = createBundleResolutionConnector(BundleCompatibility.REQUIRE_EXACT_BUNDLE);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorTroubleshootingIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorTroubleshootingIT.java
new file mode 100644
index 000000000000..e6cd77dab2da
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/connectors/ConnectorTroubleshootingIT.java
@@ -0,0 +1,1021 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.connectors;
+
+import jakarta.ws.rs.WebApplicationException;
+import org.apache.nifi.components.connector.ConnectorState;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.dto.AssetReferenceDTO;
+import org.apache.nifi.web.api.dto.ConnectorConfigurationDTO;
+import org.apache.nifi.web.api.dto.ConnectorValueReferenceDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.flow.FlowDTO;
+import org.apache.nifi.web.api.entity.AssetEntity;
+import org.apache.nifi.web.api.entity.AssetsEntity;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectorEntity;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.apache.nifi.web.api.entity.ParameterProviderEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * System tests that validate the Troubleshooting lifecycle of Connectors.
+ */
+public class ConnectorTroubleshootingIT extends NiFiSystemIT {
+
+ /**
+ * Transition a Connector into Troubleshooting, modify a processor inside the managed flow, then transition back
+ * out. The Connector's authoritative flow should be restored on exit and the Connector should start smoothly.
+ */
+ @Test
+ public void testEnterAndExitTroubleshootingRestoresFlow() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ final List originalProcessors = findAllProcessors(connectorId);
+ assertFalse(originalProcessors.isEmpty(), "Managed flow should contain processors");
+ final ProcessorEntity originalProcessor = originalProcessors.get(0);
+ final String originalSchedulingPeriod = originalProcessor.getComponent().getConfig().getSchedulingPeriod();
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ final ProcessorEntity fetchedProcessor = getNifiClient().getProcessorClient().getProcessor(originalProcessor.getId());
+ assertNotNull(fetchedProcessor);
+
+ fetchedProcessor.getComponent().getConfig().setSchedulingPeriod("42 sec");
+ final ProcessorEntity updatedProcessor = getNifiClient().getProcessorClient().updateProcessor(fetchedProcessor);
+ assertEquals("42 sec", updatedProcessor.getComponent().getConfig().getSchedulingPeriod());
+
+ getClientUtil().endTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.STOPPED);
+
+ final ProcessorEntity restoredProcessor = findProcessorByName(connectorId, originalProcessor.getComponent().getName());
+ assertNotNull(restoredProcessor, "Expected original processor to be restored by authoritative flow");
+ assertEquals(originalSchedulingPeriod, restoredProcessor.getComponent().getConfig().getSchedulingPeriod(),
+ "Scheduling period should be restored to authoritative value");
+
+ getClientUtil().startConnector(connectorId);
+ assertConnectorState(connectorId, ConnectorState.RUNNING);
+ }
+
+ /**
+ * Transition into Troubleshooting, add a new Connection, queue up data in that Connection, and verify that ending
+ * Troubleshooting fails with a 409. Restart NiFi and verify the data is still queued. Drop all FlowFiles, then end
+ * Troubleshooting successfully.
+ */
+ @Test
+ public void testEndTroubleshootingBlockedByQueuedData() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ getClientUtil().enterTroubleshooting(connectorId);
+
+ final String managedGroupId = getNifiClient().getConnectorClient().getConnector(connectorId).getComponent().getManagedProcessGroupId();
+ assertNotNull(managedGroupId);
+
+ final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile", managedGroupId);
+ final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", managedGroupId);
+ getClientUtil().updateProcessorProperties(generate, Map.of("File Size", "10 B"));
+ getClientUtil().updateProcessorSchedulingPeriod(generate, "10 ms");
+
+ final ConnectionEntity connection = getClientUtil().createConnection(generate, terminate, "success", managedGroupId);
+ final String connectionId = connection.getId();
+
+ getClientUtil().startProcessor(generate);
+ waitForQueuedFlowFiles(connectorId, connectionId, 1);
+ getClientUtil().stopProcessor(generate);
+
+ try {
+ getClientUtil().endTroubleshooting(connectorId);
+ fail("Expected endTroubleshooting to fail with 409 while connection has queued data");
+ } catch (final NiFiClientException e) {
+ assertConflict(e);
+ }
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ restartNiFi();
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+ final OptionalInt queued = getQueuedCount(connectorId, connectionId);
+ assertTrue(queued.isPresent(), "Connection must still be present after restart");
+ assertTrue(queued.getAsInt() > 0, "Queued data must survive restart");
+
+ getClientUtil().emptyQueue(connectionId);
+ waitForQueuedFlowFiles(connectorId, connectionId, 0);
+
+ getClientUtil().endTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.STOPPED);
+
+ final List processorIdsAfterRestore = findAllProcessors(connectorId).stream()
+ .map(ProcessorEntity::getId)
+ .toList();
+
+ assertFalse(processorIdsAfterRestore.contains(generate.getId()), "GenerateFlowFile processor added in Troubleshooting should be removed once authoritative flow is restored");
+ assertFalse(processorIdsAfterRestore.contains(terminate.getId()), "TerminateFlowFile processor added in Troubleshooting should be removed once authoritative flow is restored");
+ assertFalse(getQueuedCount(connectorId, connectionId).isPresent(), "Connection added in Troubleshooting should be removed once authoritative flow is restored");
+ }
+
+ /**
+ * Verify that GET/PUT/POST/DELETE on a processor that lives within a Connector's managed flow is denied with a 409
+ * when the Connector is not in Troubleshooting mode, and succeeds once it is.
+ */
+ @Test
+ public void testComponentAccessBlockedWhenNotInTroubleshooting() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ final List processors = findAllProcessors(connectorId);
+ assertFalse(processors.isEmpty());
+ final String processorId = processors.get(0).getId();
+
+ try {
+ getNifiClient().getProcessorClient().getProcessor(processorId);
+ fail("Expected 409 Conflict retrieving processor managed by Connector when not in Troubleshooting");
+ } catch (final NiFiClientException e) {
+ assertConflict(e);
+ }
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ final ProcessorEntity processor = getNifiClient().getProcessorClient().getProcessor(processorId);
+ assertNotNull(processor);
+ assertEquals(processorId, processor.getId());
+ }
+
+ /**
+ * Stop a connector, enter Troubleshooting, start processors inside the managed flow, restart NiFi, and ensure the
+ * processors are still running and the Connector remains in Troubleshooting.
+ */
+ @Test
+ public void testProcessorsInTroubleshootingStillRunningAfterRestart() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ getClientUtil().enterTroubleshooting(connectorId);
+
+ final List processors = findAllProcessors(connectorId);
+ assertFalse(processors.isEmpty());
+
+ final List startedProcessorIds = new ArrayList<>();
+ for (final ProcessorEntity processor : processors) {
+ if (ScheduledState.DISABLED.name().equals(processor.getComponent().getState())) {
+ continue;
+ }
+
+ try {
+ getClientUtil().startProcessor(processor);
+ startedProcessorIds.add(processor.getId());
+ } catch (final Exception ignored) {
+ }
+ }
+
+ assertFalse(startedProcessorIds.isEmpty(), "Expected at least one processor to be started");
+
+ for (final String id : startedProcessorIds) {
+ waitForProcessorState(id, ScheduledState.RUNNING);
+ }
+
+ restartNiFi();
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+ for (final String id : startedProcessorIds) {
+ waitForProcessorState(id, ScheduledState.RUNNING);
+ }
+ }
+
+ /**
+ * A running Connector must be able to transition into
+ * Troubleshooting mode without the framework stopping any of the components inside the managed flow. Any component
+ * that was RUNNING prior to entering Troubleshooting must remain RUNNING after the transition, and must also
+ * survive a restart of NiFi while the Connector stays in Troubleshooting.
+ */
+ @Test
+ public void testEnterTroubleshootingFromRunningKeepsProcessorsRunning() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+ getClientUtil().startConnector(connectorId);
+ assertConnectorState(connectorId, ConnectorState.RUNNING);
+
+ final List runningBeforeTroubleshooting = findProcessorsInState(connectorId, ScheduledState.RUNNING);
+ assertFalse(runningBeforeTroubleshooting.isEmpty(), "Expected at least one processor to be RUNNING before entering Troubleshooting");
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ for (final ProcessorEntity processor : runningBeforeTroubleshooting) {
+ final ProcessorEntity refreshed = getNifiClient().getProcessorClient().getProcessor(processor.getId());
+ assertEquals(ScheduledState.RUNNING.name(), refreshed.getComponent().getState(),
+ "Processor " + refreshed.getComponent().getName() + " must remain RUNNING after entering Troubleshooting");
+ }
+
+ restartNiFi();
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+ for (final ProcessorEntity processor : runningBeforeTroubleshooting) {
+ waitForProcessorState(processor.getId(), ScheduledState.RUNNING);
+ }
+ }
+
+ /**
+ * While a Connector is in Troubleshooting mode, lifecycle operations on the Connector itself (start, stop,
+ * applyUpdate, drain, purge, and delete) must be blocked with a 409 Conflict. Once Troubleshooting is exited those
+ * operations become available again.
+ */
+ @Test
+ public void testConnectorLifecycleBlockedDuringTroubleshooting() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ assertConflictExpected("startConnector", () -> {
+ final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId);
+ getNifiClient().getConnectorClient().startConnector(entity);
+ });
+
+ assertConflictExpected("stopConnector", () -> {
+ final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId);
+ getNifiClient().getConnectorClient().stopConnector(entity);
+ });
+
+ assertConflictExpected("applyUpdate", () -> {
+ final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId);
+ getNifiClient().getConnectorClient().applyUpdate(entity);
+ });
+
+ assertConflictExpected("drainConnector", () -> {
+ final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId);
+ getNifiClient().getConnectorClient().drainConnector(entity);
+ });
+
+ assertConflictExpected("createPurgeRequest", () -> getNifiClient().getConnectorClient().createPurgeRequest(connectorId));
+
+ assertConflictExpected("deleteConnector", () -> {
+ final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId);
+ getNifiClient().getConnectorClient().deleteConnector(entity);
+ });
+
+ getClientUtil().endTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.STOPPED);
+
+ getClientUtil().startConnector(connectorId);
+ assertConnectorState(connectorId, ConnectorState.RUNNING);
+ }
+
+ /**
+ * Verify that non-Processor component types inside a Connector's managed flow are reachable through the standard
+ * component REST endpoints while the Connector is in Troubleshooting mode, and that access is blocked with a 409
+ * Conflict when the Connector is not in Troubleshooting mode. Covers Connections, Ports (input and output),
+ * ControllerServices, and child ProcessGroups.
+ */
+ @Test
+ public void testNonProcessorComponentsInManagedFlow() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ // Enter Troubleshooting briefly to discover the IDs of components inside the managed flow. This is the only
+ // straightforward way to obtain the ControllerService ID; the rest are discoverable via the Connector's flow
+ // endpoint, but using Troubleshooting here keeps the discovery uniform.
+ getClientUtil().enterTroubleshooting(connectorId);
+ final String connectionId = findFirstConnectionId(connectorId);
+ final String inputPortId = findFirstInputPortId(connectorId);
+ final String outputPortId = findFirstOutputPortId(connectorId);
+ final String controllerServiceId = findFirstControllerServiceId(connectorId);
+ final String childGroupId = findFirstChildProcessGroupId(connectorId);
+
+ assertNotNull(connectionId, "Managed flow should contain at least one Connection");
+ assertNotNull(inputPortId, "Managed flow should contain at least one InputPort");
+ assertNotNull(outputPortId, "Managed flow should contain at least one OutputPort");
+ assertNotNull(controllerServiceId, "Managed flow should contain at least one ControllerService");
+ assertNotNull(childGroupId, "Managed flow should contain at least one child ProcessGroup");
+
+ final ConnectionEntity connection = getNifiClient().getConnectionClient().getConnection(connectionId);
+ assertEquals(connectionId, connection.getId());
+
+ final PortEntity inputPort = getNifiClient().getInputPortClient().getInputPort(inputPortId);
+ assertEquals(inputPortId, inputPort.getId());
+
+ final PortEntity outputPort = getNifiClient().getOutputPortClient().getOutputPort(outputPortId);
+ assertEquals(outputPortId, outputPort.getId());
+
+ final ControllerServiceEntity controllerService = getNifiClient().getControllerServicesClient().getControllerService(controllerServiceId);
+ assertEquals(controllerServiceId, controllerService.getId());
+
+ final ProcessGroupEntity childGroup = getNifiClient().getProcessGroupClient().getProcessGroup(childGroupId);
+ assertEquals(childGroupId, childGroup.getId());
+
+ getClientUtil().endTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.STOPPED);
+
+ assertConflictExpected("GET connection", () -> getNifiClient().getConnectionClient().getConnection(connectionId));
+ assertConflictExpected("GET input port", () -> getNifiClient().getInputPortClient().getInputPort(inputPortId));
+ assertConflictExpected("GET output port", () -> getNifiClient().getOutputPortClient().getOutputPort(outputPortId));
+ assertConflictExpected("GET controller service", () -> getNifiClient().getControllerServicesClient().getControllerService(controllerServiceId));
+ assertConflictExpected("GET process group", () -> getNifiClient().getProcessGroupClient().getProcessGroup(childGroupId));
+ }
+
+ /**
+ * Verify that the Connector's flow endpoint correctly returns the flow for a non-root child process
+ * group within the managed flow while in Troubleshooting mode. The fix that enables this is in
+ * StandardProcessGroup.findProcessGroup: child groups within a connector's managed hierarchy do not
+ * carry a connectorId field on their own ProcessGroup object, so the old connector-ID-filtered
+ * getGroup(id, connectorId) lookup always returned null for them, causing a 404. The unconditional
+ * getGroup(id) lookup combined with the existing isOwner hierarchy check is the correct approach.
+ */
+ @Test
+ public void testGetFlowForChildProcessGroupInTroubleshooting() throws NiFiClientException, IOException, InterruptedException {
+ final ConnectorEntity connector = getClientUtil().createConnector("ComponentLifecycleConnector");
+ final String connectorId = connector.getId();
+
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ final ProcessGroupFlowEntity rootFlow = getNifiClient().getConnectorClient().getFlow(connectorId);
+ final List childGroups = new ArrayList<>(rootFlow.getProcessGroupFlow().getFlow().getProcessGroups());
+ assertFalse(childGroups.isEmpty(), "ComponentLifecycleConnector managed flow must contain at least one child process group");
+
+ final String childGroupId = childGroups.get(0).getId();
+
+ final ProcessGroupFlowEntity childFlow = getNifiClient().getConnectorClient().getFlow(connectorId, childGroupId);
+ assertNotNull(childFlow);
+ assertEquals(childGroupId, childFlow.getProcessGroupFlow().getId());
+ }
+
+ private void assertConnectorState(final String connectorId, final ConnectorState expected) throws NiFiClientException, IOException {
+ final ConnectorEntity entity = getNifiClient().getConnectorClient().getConnector(connectorId);
+ assertEquals(expected.name(), entity.getComponent().getState());
+ }
+
+ /**
+ * Stop and restart the NiFi instance, then wait for all nodes to reconnect when running in a clustered
+ * environment. Subsequent flow-modifying requests (such as {@code endTroubleshooting}) would otherwise be rejected
+ * with HTTP 409 Conflict while one of the nodes is still in the CONNECTING state after the restart.
+ */
+ private void restartNiFi() {
+ getNiFiInstance().stop();
+ getNiFiInstance().start();
+
+ if (getNiFiInstance().isClustered()) {
+ waitForAllNodesConnected();
+ }
+ }
+
+ private void assertConflict(final NiFiClientException e) {
+ final Throwable cause = e.getCause();
+ if (cause instanceof WebApplicationException wae) {
+ assertEquals(409, wae.getResponse().getStatus(), "Expected 409 Conflict, got: " + wae.getResponse().getStatus());
+ return;
+ }
+
+ fail("Expected WebApplicationException 409, got: " + cause);
+ }
+
+ private List findAllProcessors(final String connectorId) throws NiFiClientException, IOException {
+ final List result = new ArrayList<>();
+ collectProcessors(connectorId, null, result);
+ return result;
+ }
+
+ private void collectProcessors(final String connectorId, final String groupId, final List collected) throws NiFiClientException, IOException {
+ final ProcessGroupFlowEntity entity = (groupId == null) ? getNifiClient().getConnectorClient().getFlow(connectorId) : getNifiClient().getConnectorClient().getFlow(connectorId, groupId);
+ final FlowDTO flow = entity.getProcessGroupFlow().getFlow();
+ collected.addAll(flow.getProcessors());
+
+ for (final ProcessGroupEntity child : flow.getProcessGroups()) {
+ collectProcessors(connectorId, child.getId(), collected);
+ }
+ }
+
+ private ProcessorEntity findProcessorByName(final String connectorId, final String name) throws NiFiClientException, IOException {
+ for (final ProcessorEntity entity : findAllProcessors(connectorId)) {
+ final ProcessorDTO dto = entity.getComponent();
+ if (name.equals(dto.getName())) {
+ return entity;
+ }
+ }
+
+ return null;
+ }
+
+ private OptionalInt getQueuedCount(final String connectorId, final String connectionId) throws NiFiClientException, IOException {
+ return collectQueuedCount(connectorId, null, connectionId);
+ }
+
+ private OptionalInt collectQueuedCount(final String connectorId, final String groupId, final String connectionId) throws NiFiClientException, IOException {
+ final ProcessGroupFlowEntity entity = (groupId == null) ? getNifiClient().getConnectorClient().getFlow(connectorId) : getNifiClient().getConnectorClient().getFlow(connectorId, groupId);
+
+ final FlowDTO flow = entity.getProcessGroupFlow().getFlow();
+ for (final ConnectionEntity connection : flow.getConnections()) {
+ if (connectionId.equals(connection.getId())) {
+ final String queued = connection.getStatus().getAggregateSnapshot().getQueued();
+ final String count = queued.substring(0, queued.indexOf(' '));
+ return OptionalInt.of(Integer.parseInt(count.replace(",", "")));
+ }
+ }
+
+ for (final ProcessGroupEntity child : flow.getProcessGroups()) {
+ final OptionalInt childResult = collectQueuedCount(connectorId, child.getId(), connectionId);
+ if (childResult.isPresent()) {
+ return childResult;
+ }
+ }
+
+ return OptionalInt.empty();
+ }
+
+ private void waitForQueuedFlowFiles(final String connectorId, final String connectionId, final int minCount) throws InterruptedException {
+ waitFor(() -> {
+ try {
+ final OptionalInt queuedCount = getQueuedCount(connectorId, connectionId);
+ if (queuedCount.isEmpty()) {
+ return false;
+ }
+ final int count = queuedCount.getAsInt();
+ if (minCount == 0) {
+ return count == 0;
+ }
+ return count >= minCount;
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+ }
+
+ private void waitForProcessorState(final String processorId, final ScheduledState desired) throws InterruptedException {
+ waitFor(() -> {
+ try {
+ final ProcessorEntity entity = getNifiClient().getProcessorClient().getProcessor(processorId);
+ return desired.name().equals(entity.getComponent().getState());
+ } catch (final Exception e) {
+ return false;
+ }
+ });
+ }
+
+ private List findProcessorsInState(final String connectorId, final ScheduledState state) throws NiFiClientException, IOException {
+ final List matches = new ArrayList<>();
+ for (final ProcessorEntity entity : findAllProcessors(connectorId)) {
+ if (state.name().equals(entity.getComponent().getState())) {
+ matches.add(entity);
+ }
+ }
+ return matches;
+ }
+
+ private String findFirstConnectionId(final String connectorId) throws NiFiClientException, IOException {
+ final FlowDTO rootFlow = getNifiClient().getConnectorClient().getFlow(connectorId).getProcessGroupFlow().getFlow();
+ if (!rootFlow.getConnections().isEmpty()) {
+ return rootFlow.getConnections().iterator().next().getId();
+ }
+ for (final ProcessGroupEntity child : rootFlow.getProcessGroups()) {
+ final FlowDTO childFlow = getNifiClient().getConnectorClient().getFlow(connectorId, child.getId()).getProcessGroupFlow().getFlow();
+ if (!childFlow.getConnections().isEmpty()) {
+ return childFlow.getConnections().iterator().next().getId();
+ }
+ }
+ return null;
+ }
+
+ private String findFirstInputPortId(final String connectorId) throws NiFiClientException, IOException {
+ return findFirstPortId(connectorId, true);
+ }
+
+ private String findFirstOutputPortId(final String connectorId) throws NiFiClientException, IOException {
+ return findFirstPortId(connectorId, false);
+ }
+
+ private String findFirstPortId(final String connectorId, final boolean input) throws NiFiClientException, IOException {
+ final FlowDTO rootFlow = getNifiClient().getConnectorClient().getFlow(connectorId).getProcessGroupFlow().getFlow();
+ for (final PortEntity port : input ? rootFlow.getInputPorts() : rootFlow.getOutputPorts()) {
+ return port.getId();
+ }
+ for (final ProcessGroupEntity child : rootFlow.getProcessGroups()) {
+ final FlowDTO childFlow = getNifiClient().getConnectorClient().getFlow(connectorId, child.getId()).getProcessGroupFlow().getFlow();
+ for (final PortEntity port : input ? childFlow.getInputPorts() : childFlow.getOutputPorts()) {
+ return port.getId();
+ }
+ }
+ return null;
+ }
+
+ private String findFirstControllerServiceId(final String connectorId) throws NiFiClientException, IOException {
+ final String managedGroupId = getNifiClient().getConnectorClient().getConnector(connectorId).getComponent().getManagedProcessGroupId();
+ return collectFirstControllerServiceId(managedGroupId);
+ }
+
+ private String collectFirstControllerServiceId(final String groupId) throws NiFiClientException, IOException {
+ for (final ControllerServiceEntity entity : getNifiClient().getFlowClient().getControllerServices(groupId).getControllerServices()) {
+ final ControllerServiceDTO dto = entity.getComponent();
+ if (dto != null) {
+ return entity.getId();
+ }
+ }
+
+ final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient().getProcessGroup(groupId);
+ for (final ProcessGroupEntity child : flow.getProcessGroupFlow().getFlow().getProcessGroups()) {
+ final String childServiceId = collectFirstControllerServiceId(child.getId());
+ if (childServiceId != null) {
+ return childServiceId;
+ }
+ }
+
+ return null;
+ }
+
+ private String findFirstChildProcessGroupId(final String connectorId) throws NiFiClientException, IOException {
+ final FlowDTO flow = getNifiClient().getConnectorClient().getFlow(connectorId).getProcessGroupFlow().getFlow();
+ for (final ProcessGroupEntity child : flow.getProcessGroups()) {
+ final ProcessGroupDTO dto = child.getComponent();
+ if (dto != null) {
+ return child.getId();
+ }
+ }
+ return null;
+ }
+
+ private void assertConflictExpected(final String description, final ConflictingCall call) {
+ try {
+ call.run();
+ fail("Expected 409 Conflict for " + description + " but request succeeded");
+ } catch (final NiFiClientException e) {
+ assertConflict(e);
+ } catch (final IOException e) {
+ fail("Unexpected IOException while invoking " + description + ": " + e.getMessage());
+ }
+ }
+
+ @FunctionalInterface
+ private interface ConflictingCall {
+ void run() throws NiFiClientException, IOException;
+ }
+
+ /**
+ * Validates that a Connector whose managed flow references parameters correctly resolves those parameter values
+ * both before and after a NiFi restart while in Troubleshooting mode. Parameter values live inside the managed
+ * Process Group's ParameterContext. The Connector's managed Parameter Context is intentionally not registered
+ * with the global ParameterContextManager and is therefore not persisted in flow.json, so on every restart the
+ * Connector's lifecycle must re-populate it. The restart path for a Connector whose effective state is
+ * TROUBLESHOOTING goes through {@code StandardConnectorRepository#syncConnector}, which calls
+ * {@code ConnectorNode#inheritConfiguration} so the Connector's applyUpdate re-populates the Parameter Context
+ * from the persisted active configuration, and then immediately overlays the persisted Troubleshooting snapshot
+ * via {@code StandardFlowContext#restoreTroubleshootingFlow}. The structural overlay preserves the in-memory Parameter
+ * Context binding, so the Connector-supplied parameter values remain available while the user's flow
+ * modifications are the ones that run. The transient Connector-supplied flow shape is invisible outside this
+ * synchronization cycle: flow synchronization completes before the FlowFile Repository attaches FlowFiles to
+ * queues, so no FlowFile movement can observe it.
+ */
+ @Test
+ public void testParameterValuesResolvedBeforeAndAfterRestartInTroubleshooting() throws NiFiClientException, IOException, InterruptedException {
+ // Use a secret name unique to this test so the underlying SecretsManager cannot return a value cached for the
+ // generic name "secret" by another test that ran earlier in the same JVM.
+ final String secretName = "parameterResolutionSecret";
+ final String sensitiveSecretValue = "my-super-secret-value";
+ final String assetFileContent = "Hello, World!";
+ final File sensitiveOutputFile = new File("target/troubleshooting-sensitive.txt");
+ final File assetOutputFile = new File("target/troubleshooting-asset.txt");
+ sensitiveOutputFile.delete();
+ assetOutputFile.delete();
+
+ final ParameterProviderEntity paramProvider = getClientUtil().createParameterProvider("PropertiesParameterProvider");
+ getClientUtil().updateParameterProviderProperties(paramProvider, Map.of("parameters", secretName + "=" + sensitiveSecretValue));
+
+ final ConnectorEntity connector = getClientUtil().createConnector("ParameterContextConnector");
+ final String connectorId = connector.getId();
+
+ final File assetFile = new File("src/test/resources/sample-assets/helloworld.txt");
+ final AssetEntity assetEntity = getNifiClient().getConnectorClient().createAsset(connectorId, assetFile.getName(), assetFile);
+ final String uploadedAssetId = assetEntity.getAsset().getId();
+
+ final ConnectorValueReferenceDTO secretRef = getClientUtil().createSecretValueReference(
+ paramProvider.getId(), secretName, "PropertiesParameterProvider.Parameters." + secretName);
+ final ConnectorValueReferenceDTO assetRef = new ConnectorValueReferenceDTO();
+ assetRef.setValueType("ASSET_REFERENCE");
+ assetRef.setAssetReferences(List.of(new AssetReferenceDTO(uploadedAssetId)));
+
+ final Map propertyValues = new HashMap<>();
+ propertyValues.put("Sensitive Value", secretRef);
+ propertyValues.put("Asset File", assetRef);
+ propertyValues.put("Sensitive Output File", createStringLiteralRef(sensitiveOutputFile.getAbsolutePath()));
+ propertyValues.put("Asset Output File", createStringLiteralRef(assetOutputFile.getAbsolutePath()));
+
+ getClientUtil().configureConnectorWithReferences(connectorId, "Parameter Context Configuration", propertyValues);
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ // Transition into Troubleshooting from STOPPED; components inside the managed flow are not yet running.
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ // First verification: parameter values resolve correctly on the initial flow, before any restart.
+ runManagedFlowAndAssertParameterValues(connectorId, sensitiveOutputFile, assetOutputFile, sensitiveSecretValue, assetFileContent, "before restart");
+
+ // Stop every component and clear the output files so that the post-restart run can prove the parameter
+ // values were re-populated correctly rather than simply finding the files produced by the pre-restart run.
+ stopAllManagedComponents(connectorId);
+ assertTrue(sensitiveOutputFile.delete() || !sensitiveOutputFile.exists(), "Failed to delete sensitive output file between runs");
+ assertTrue(assetOutputFile.delete() || !assetOutputFile.exists(), "Failed to delete asset output file between runs");
+
+ restartNiFi();
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ // Second verification: after the restore path has been exercised, the Parameter Context must still produce
+ // the correct resolved values when the processors are started again.
+ runManagedFlowAndAssertParameterValues(connectorId, sensitiveOutputFile, assetOutputFile, sensitiveSecretValue, assetFileContent, "after restart");
+ }
+
+ /**
+ * Validates that a Connector's configured (non-default) Active configuration survives a NiFi restart while the
+ * Connector is in Troubleshooting mode, and that flow modifications made during Troubleshooting are discarded
+ * when Troubleshooting exits. The end-to-end scenario:
+ *
+ * - Create a Connector and configure it with non-default property values so its active configuration and
+ * active managed flow both differ from the unconfigured defaults.
+ * - Apply the update so the active flow is the Connector's authoritative non-default flow.
+ * - Enter Troubleshooting and modify the managed flow by adding a processor that the Connector's
+ * authoritative flow does not contain.
+ * - Restart NiFi while still in Troubleshooting.
+ * - Exit Troubleshooting.
+ * - Assert the active configuration still contains the configured (non-default) values, the active flow is
+ * the Connector's authoritative non-default flow rather than the user-modified Troubleshooting flow, and
+ * finally that running the Connector produces output at the configured non-default destinations.
+ *
+ */
+ @Test
+ public void testConfigurationAndAuthoritativeFlowRestoredAfterTroubleshootingRestart() throws NiFiClientException, IOException, InterruptedException {
+ // Use a secret name unique to this test so the underlying SecretsManager cannot return a value cached for the
+ // generic name "secret" by another test that ran earlier in the same JVM.
+ final String secretName = "configurationRestoreSecret";
+ final String sensitiveSecretValue = "configured-secret-value";
+ final String assetFileContent = "Hello, World!";
+ final File configuredSensitiveOutput = new File("target/configuration-restore-sensitive.txt");
+ final File configuredAssetOutput = new File("target/configuration-restore-asset.txt");
+ configuredSensitiveOutput.delete();
+ configuredAssetOutput.delete();
+
+ final ParameterProviderEntity paramProvider = getClientUtil().createParameterProvider("PropertiesParameterProvider");
+ getClientUtil().updateParameterProviderProperties(paramProvider, Map.of("parameters", secretName + "=" + sensitiveSecretValue));
+
+ final ConnectorEntity connector = getClientUtil().createConnector("ParameterContextConnector");
+ final String connectorId = connector.getId();
+
+ final File assetFile = new File("src/test/resources/sample-assets/helloworld.txt");
+ final AssetEntity assetEntity = getNifiClient().getConnectorClient().createAsset(connectorId, assetFile.getName(), assetFile);
+ final String uploadedAssetId = assetEntity.getAsset().getId();
+
+ final ConnectorValueReferenceDTO secretRef = getClientUtil().createSecretValueReference(
+ paramProvider.getId(), secretName, "PropertiesParameterProvider.Parameters." + secretName);
+ final ConnectorValueReferenceDTO assetRef = new ConnectorValueReferenceDTO();
+ assetRef.setValueType("ASSET_REFERENCE");
+ assetRef.setAssetReferences(List.of(new AssetReferenceDTO(uploadedAssetId)));
+
+ // The output file paths differ from the property descriptors' default values, so the active configuration
+ // after restart can be checked against these specific paths to prove the configured values were preserved
+ // instead of being overwritten by defaults.
+ final Map propertyValues = new HashMap<>();
+ propertyValues.put("Sensitive Value", secretRef);
+ propertyValues.put("Asset File", assetRef);
+ propertyValues.put("Sensitive Output File", createStringLiteralRef(configuredSensitiveOutput.getAbsolutePath()));
+ propertyValues.put("Asset Output File", createStringLiteralRef(configuredAssetOutput.getAbsolutePath()));
+
+ getClientUtil().configureConnectorWithReferences(connectorId, "Parameter Context Configuration", propertyValues);
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ assertNotNull(findProcessorByName(connectorId, "UpdateContent"),
+ "Active flow should contain UpdateContent before Troubleshooting");
+ assertNotNull(findProcessorByName(connectorId, "ReplaceWithFile"),
+ "Active flow should contain ReplaceWithFile before Troubleshooting");
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ // The Sleep processor is created in STOPPED state and is left disconnected so endTroubleshooting can later
+ // succeed without first having to stop or empty any user-introduced components.
+ final String managedGroupId = getNifiClient().getConnectorClient().getConnector(connectorId).getComponent().getManagedProcessGroupId();
+ final ProcessorEntity troubleshootingProcessor = getClientUtil().createProcessor("Sleep", managedGroupId);
+ final String troubleshootingProcessorId = troubleshootingProcessor.getId();
+ assertTrue(containsProcessorId(connectorId, troubleshootingProcessorId),
+ "User-added Sleep processor should be present in the managed flow after adding it in Troubleshooting");
+
+ restartNiFi();
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+ assertTrue(containsProcessorId(connectorId, troubleshootingProcessorId),
+ "User-added Sleep processor should survive restart while in Troubleshooting");
+
+ getClientUtil().endTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.STOPPED);
+
+ final ConnectorEntity afterExit = getNifiClient().getConnectorClient().getConnector(connectorId);
+ final ConnectorConfigurationDTO activeConfig = afterExit.getComponent().getActiveConfiguration();
+ final Map activeProperties = activeConfig.getConfigurationStepConfigurations().getFirst()
+ .getPropertyGroupConfigurations().getFirst().getPropertyValues();
+ assertEquals(configuredSensitiveOutput.getAbsolutePath(), activeProperties.get("Sensitive Output File").getValue(),
+ "Active configuration must retain the configured Sensitive Output File path");
+ assertEquals(configuredAssetOutput.getAbsolutePath(), activeProperties.get("Asset Output File").getValue(),
+ "Active configuration must retain the configured Asset Output File path");
+ assertEquals("ASSET_REFERENCE", activeProperties.get("Asset File").getValueType(),
+ "Active configuration must retain the Asset reference for Asset File");
+ assertEquals(uploadedAssetId, activeProperties.get("Asset File").getAssetReferences().get(0).getId(),
+ "Active configuration must retain the uploaded Asset id for Asset File");
+ assertEquals("SECRET_REFERENCE", activeProperties.get("Sensitive Value").getValueType(),
+ "Active configuration must retain the Secret reference for Sensitive Value");
+
+ assertFalse(containsProcessorId(connectorId, troubleshootingProcessorId),
+ "User-added Sleep processor must be removed once the authoritative flow is restored on Troubleshooting exit");
+ assertNotNull(findProcessorByName(connectorId, "UpdateContent"),
+ "Restored authoritative flow should contain UpdateContent");
+ assertNotNull(findProcessorByName(connectorId, "ReplaceWithFile"),
+ "Restored authoritative flow should contain ReplaceWithFile");
+ assertNotNull(findProcessorByName(connectorId, "GenerateFlowFile"),
+ "Restored authoritative flow should contain GenerateFlowFile");
+
+ getClientUtil().startConnector(connectorId);
+ assertConnectorState(connectorId, ConnectorState.RUNNING);
+
+ waitFor(() -> configuredSensitiveOutput.exists() && configuredAssetOutput.exists());
+ assertEquals(sensitiveSecretValue, Files.readString(configuredSensitiveOutput.toPath()).trim(),
+ "Running Connector must write the configured sensitive value to the configured Sensitive Output File");
+ assertEquals(assetFileContent, Files.readString(configuredAssetOutput.toPath()).trim(),
+ "Running Connector must write the configured asset content to the configured Asset Output File");
+ }
+
+ /**
+ * Validates that Connector Assets are retained across a NiFi restart while the Connector is in Troubleshooting mode
+ * and that, on exit, the Processor properties inside the managed flow are restored to reference the assets that the
+ * Connector's authoritative flow expects.
+ *
+ * The scenario:
+ *
+ * - Create a {@code ParameterContextConnector}, upload Asset A, and configure the Connector to use Asset A.
+ * - Apply the update so the active flow's {@code ReplaceWithFile} processor's {@code Filename} property is
+ * {@code #{asset_param}}, where {@code asset_param} resolves to Asset A's file path.
+ * - Enter Troubleshooting.
+ * - Upload a second Asset B, and override the managed flow's {@code ReplaceWithFile.Filename} property to a
+ * literal that does not reference Asset A. This represents a user editing the Processor's asset-bearing
+ * property while the Connector is open for direct edits.
+ * - Restart NiFi.
+ * - After restart, assert that the Connector is still in Troubleshooting, both Assets A and B are still listed,
+ * and the user's override of {@code Filename} survived the restart.
+ * - End Troubleshooting.
+ * - Assert that the Connector's authoritative configuration is restored: {@code Asset File} still references
+ * Asset A, and {@code ReplaceWithFile.Filename} is back to {@code #{asset_param}}.
+ *
+ */
+ @Test
+ public void testAssetsRetainedAcrossRestartInTroubleshooting() throws NiFiClientException, IOException, InterruptedException {
+ final String secretName = "assetRetentionSecret";
+ final String sensitiveSecretValue = "asset-retention-secret-value";
+ final File sensitiveOutputFile = new File("target/asset-retention-sensitive.txt");
+ final File assetOutputFile = new File("target/asset-retention-asset.txt");
+ sensitiveOutputFile.delete();
+ assetOutputFile.delete();
+
+ final ParameterProviderEntity paramProvider = getClientUtil().createParameterProvider("PropertiesParameterProvider");
+ getClientUtil().updateParameterProviderProperties(paramProvider, Map.of("parameters", secretName + "=" + sensitiveSecretValue));
+
+ final ConnectorEntity connector = getClientUtil().createConnector("ParameterContextConnector");
+ final String connectorId = connector.getId();
+
+ final File assetA = new File("src/test/resources/sample-assets/helloworld.txt");
+ final File assetB = new File("src/test/resources/sample-assets/helloworld2.txt");
+ final AssetEntity assetAEntity = getNifiClient().getConnectorClient().createAsset(connectorId, assetA.getName(), assetA);
+ final String assetAId = assetAEntity.getAsset().getId();
+
+ final ConnectorValueReferenceDTO secretRef = getClientUtil().createSecretValueReference(
+ paramProvider.getId(), secretName, "PropertiesParameterProvider.Parameters." + secretName);
+ final ConnectorValueReferenceDTO assetRef = new ConnectorValueReferenceDTO();
+ assetRef.setValueType("ASSET_REFERENCE");
+ assetRef.setAssetReferences(List.of(new AssetReferenceDTO(assetAId)));
+
+ final Map propertyValues = new HashMap<>();
+ propertyValues.put("Sensitive Value", secretRef);
+ propertyValues.put("Asset File", assetRef);
+ propertyValues.put("Sensitive Output File", createStringLiteralRef(sensitiveOutputFile.getAbsolutePath()));
+ propertyValues.put("Asset Output File", createStringLiteralRef(assetOutputFile.getAbsolutePath()));
+
+ getClientUtil().configureConnectorWithReferences(connectorId, "Parameter Context Configuration", propertyValues);
+ getClientUtil().applyConnectorUpdate(connector);
+ getClientUtil().waitForValidConnector(connectorId);
+
+ final ProcessorEntity replaceWithFileBeforeTroubleshooting = findProcessorByName(connectorId, "ReplaceWithFile");
+ assertNotNull(replaceWithFileBeforeTroubleshooting, "Active flow should contain ReplaceWithFile before Troubleshooting");
+ final String authoritativeFilenameValue = replaceWithFileBeforeTroubleshooting.getComponent().getConfig().getProperties().get("Filename");
+ assertEquals("#{asset_param}", authoritativeFilenameValue,
+ "Authoritative ReplaceWithFile.Filename should be parameterized as #{asset_param} before Troubleshooting");
+
+ getClientUtil().enterTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+
+ // Upload a second Asset while in Troubleshooting and confirm it joins the existing Asset.
+ final AssetEntity assetBEntity = getNifiClient().getConnectorClient().createAsset(connectorId, assetB.getName(), assetB);
+ final String assetBId = assetBEntity.getAsset().getId();
+ assertAssetIds(connectorId, assetAId, assetBId);
+
+ // Override the Processor's asset-bearing property to a literal that does not reference the Connector's Asset A.
+ final ProcessorEntity replaceWithFileInTroubleshooting = findProcessorByName(connectorId, "ReplaceWithFile");
+ assertNotNull(replaceWithFileInTroubleshooting, "ReplaceWithFile should remain accessible while in Troubleshooting");
+ final String overriddenFilenameValue = "target/asset-retention-override.txt";
+ getClientUtil().updateProcessorProperties(replaceWithFileInTroubleshooting, Map.of("Filename", overriddenFilenameValue));
+
+ final ProcessorEntity afterPropertyOverride = findProcessorByName(connectorId, "ReplaceWithFile");
+ assertEquals(overriddenFilenameValue, afterPropertyOverride.getComponent().getConfig().getProperties().get("Filename"),
+ "ReplaceWithFile.Filename should reflect the user override applied while in Troubleshooting");
+
+ restartNiFi();
+
+ assertConnectorState(connectorId, ConnectorState.TROUBLESHOOTING);
+ assertAssetIds(connectorId, assetAId, assetBId);
+
+ final ProcessorEntity afterRestart = findProcessorByName(connectorId, "ReplaceWithFile");
+ assertNotNull(afterRestart, "ReplaceWithFile should be present after restart while in Troubleshooting");
+ assertEquals(overriddenFilenameValue, afterRestart.getComponent().getConfig().getProperties().get("Filename"),
+ "User override of ReplaceWithFile.Filename must survive a restart while in Troubleshooting");
+
+ getClientUtil().endTroubleshooting(connectorId);
+ assertConnectorState(connectorId, ConnectorState.STOPPED);
+
+ // After exit, the Connector's authoritative flow is restored. The Processor's Filename must reference the
+ // authoritative value (#{asset_param}), not the user override.
+ final ProcessorEntity replaceWithFileAfterExit = findProcessorByName(connectorId, "ReplaceWithFile");
+ assertNotNull(replaceWithFileAfterExit, "Restored authoritative flow should contain ReplaceWithFile");
+ assertEquals(authoritativeFilenameValue, replaceWithFileAfterExit.getComponent().getConfig().getProperties().get("Filename"),
+ "Authoritative ReplaceWithFile.Filename must be restored on Troubleshooting exit");
+
+ // Verify the active configuration still references Asset A (the Connector's expected asset).
+ final ConnectorEntity afterExit = getNifiClient().getConnectorClient().getConnector(connectorId);
+ final ConnectorConfigurationDTO activeConfig = afterExit.getComponent().getActiveConfiguration();
+ final Map activeProperties = activeConfig.getConfigurationStepConfigurations().getFirst()
+ .getPropertyGroupConfigurations().getFirst().getPropertyValues();
+ assertEquals("ASSET_REFERENCE", activeProperties.get("Asset File").getValueType(),
+ "Active configuration must retain the Asset reference for Asset File");
+ assertEquals(assetAId, activeProperties.get("Asset File").getAssetReferences().get(0).getId(),
+ "Active configuration must continue to reference Asset A (the Connector's expected asset) after Troubleshooting exit");
+ }
+
+ private void assertAssetIds(final String connectorId, final String... expectedAssetIds) throws NiFiClientException, IOException {
+ final List actualAssetIds = new ArrayList<>();
+ final AssetsEntity assetsEntity = getNifiClient().getConnectorClient().getAssets(connectorId);
+ if (assetsEntity != null && assetsEntity.getAssets() != null) {
+ for (final AssetEntity asset : assetsEntity.getAssets()) {
+ if (asset.getAsset() != null) {
+ actualAssetIds.add(asset.getAsset().getId());
+ }
+ }
+ }
+ for (final String expected : expectedAssetIds) {
+ assertTrue(actualAssetIds.contains(expected),
+ "Expected Connector " + connectorId + " to contain Asset " + expected + " but found: " + actualAssetIds);
+ }
+ }
+
+ private boolean containsProcessorId(final String connectorId, final String processorId) throws NiFiClientException, IOException {
+ for (final ProcessorEntity entity : findAllProcessors(connectorId)) {
+ if (processorId.equals(entity.getId())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void runManagedFlowAndAssertParameterValues(final String connectorId, final File sensitiveOutputFile, final File assetOutputFile,
+ final String expectedSensitiveValue, final String expectedAssetValue, final String phase)
+ throws NiFiClientException, IOException, InterruptedException {
+
+ // Starting individual components is permitted while in Troubleshooting. The managed Process Group is a
+ // standard (non-stateless) group so processors and ports can be scheduled individually. The flow built by
+ // ParameterContextConnector routes FlowFiles through child group Input Ports, so every Port inside the
+ // managed flow must also be started for the pipeline to actually pass FlowFiles.
+ final List inputPorts = findAllInputPorts(connectorId);
+ final List outputPorts = findAllOutputPorts(connectorId);
+ final List processors = findAllProcessors(connectorId);
+ assertFalse(processors.isEmpty(), "Managed flow should contain processors " + phase);
+
+ for (final PortEntity port : inputPorts) {
+ getNifiClient().getInputPortClient().startInputPort(port);
+ }
+ for (final PortEntity port : outputPorts) {
+ getNifiClient().getOutputPortClient().startOutputPort(port);
+ }
+ for (final ProcessorEntity processor : processors) {
+ getClientUtil().waitForValidProcessor(processor.getId());
+ getClientUtil().startProcessor(processor);
+ }
+
+ waitFor(() -> sensitiveOutputFile.exists() && assetOutputFile.exists());
+
+ assertEquals(expectedSensitiveValue, Files.readString(sensitiveOutputFile.toPath()).trim(),
+ "Sensitive output file must contain the configured sensitive parameter value " + phase);
+ assertEquals(expectedAssetValue, Files.readString(assetOutputFile.toPath()).trim(),
+ "Asset output file must contain the asset contents referenced by the asset parameter " + phase);
+ }
+
+ private void stopAllManagedComponents(final String connectorId) throws NiFiClientException, IOException, InterruptedException {
+ for (final ProcessorEntity processor : findAllProcessors(connectorId)) {
+ getClientUtil().stopProcessor(processor);
+ }
+ for (final PortEntity port : findAllInputPorts(connectorId)) {
+ getNifiClient().getInputPortClient().stopInputPort(port);
+ }
+ for (final PortEntity port : findAllOutputPorts(connectorId)) {
+ getNifiClient().getOutputPortClient().stopOutputPort(port);
+ }
+ for (final ProcessorEntity processor : findAllProcessors(connectorId)) {
+ waitForProcessorState(processor.getId(), ScheduledState.STOPPED);
+ }
+ }
+
+ private List findAllInputPorts(final String connectorId) throws NiFiClientException, IOException {
+ final List result = new ArrayList<>();
+ collectPorts(connectorId, null, true, result);
+ return result;
+ }
+
+ private List findAllOutputPorts(final String connectorId) throws NiFiClientException, IOException {
+ final List result = new ArrayList<>();
+ collectPorts(connectorId, null, false, result);
+ return result;
+ }
+
+ private void collectPorts(final String connectorId, final String groupId, final boolean input, final List collected) throws NiFiClientException, IOException {
+ final ProcessGroupFlowEntity entity = (groupId == null) ? getNifiClient().getConnectorClient().getFlow(connectorId) : getNifiClient().getConnectorClient().getFlow(connectorId, groupId);
+ final FlowDTO flow = entity.getProcessGroupFlow().getFlow();
+ collected.addAll(input ? flow.getInputPorts() : flow.getOutputPorts());
+
+ for (final ProcessGroupEntity child : flow.getProcessGroups()) {
+ collectPorts(connectorId, child.getId(), input, collected);
+ }
+ }
+
+ private ConnectorValueReferenceDTO createStringLiteralRef(final String value) {
+ final ConnectorValueReferenceDTO ref = new ConnectorValueReferenceDTO();
+ ref.setValueType("STRING_LITERAL");
+ ref.setValue(value);
+ return ref;
+ }
+}
diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
index 05f9ed173c3d..14a909fc3bc2 100644
--- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
+++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java
@@ -177,6 +177,28 @@ public interface ConnectorClient {
*/
ConnectorEntity cancelDrain(ConnectorEntity connectorEntity) throws NiFiClientException, IOException;
+ /**
+ * Transitions a connector into Troubleshooting mode.
+ *
+ * @param connectorEntity the connector entity (must contain id and revision)
+ * @return the updated connector entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ConnectorEntity enterTroubleshooting(ConnectorEntity connectorEntity) throws NiFiClientException, IOException;
+
+ /**
+ * Transitions a connector out of Troubleshooting mode.
+ *
+ * @param connectorId the connector ID
+ * @param clientId the client ID
+ * @param version the revision version
+ * @return the updated connector entity
+ * @throws NiFiClientException if an error occurs during the request
+ * @throws IOException if an I/O error occurs
+ */
+ ConnectorEntity endTroubleshooting(String connectorId, String clientId, long version) throws NiFiClientException, IOException;
+
/**
* Gets the configuration step names for a connector.
*
diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
index 7b648e7183e6..d972f4891591 100644
--- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
+++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java
@@ -235,6 +235,64 @@ private ConnectorEntity cancelDrain(final String connectorId, final String clien
});
}
+ @Override
+ public ConnectorEntity enterTroubleshooting(final ConnectorEntity connectorEntity) throws NiFiClientException, IOException {
+ return enterTroubleshooting(connectorEntity.getId(), connectorEntity.getRevision().getClientId(),
+ connectorEntity.getRevision().getVersion(), connectorEntity.isDisconnectedNodeAcknowledged());
+ }
+
+ private ConnectorEntity enterTroubleshooting(final String connectorId, final String clientId, final long version,
+ final Boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector ID cannot be null or blank");
+ }
+
+ return executeAction("Error entering Troubleshooting mode", () -> {
+ final WebTarget target = connectorTarget
+ .path("/troubleshooting")
+ .resolveTemplate("id", connectorId);
+
+ final ConnectorEntity requestEntity = new ConnectorEntity();
+ requestEntity.setId(connectorId);
+ requestEntity.setDisconnectedNodeAcknowledged(disconnectedNodeAcknowledged);
+
+ final RevisionDTO revisionDto = new RevisionDTO();
+ revisionDto.setClientId(clientId);
+ revisionDto.setVersion(version);
+ requestEntity.setRevision(revisionDto);
+
+ return getRequestBuilder(target).post(
+ Entity.entity(requestEntity, MediaType.APPLICATION_JSON_TYPE),
+ ConnectorEntity.class);
+ });
+ }
+
+ @Override
+ public ConnectorEntity endTroubleshooting(final String connectorId, final String clientId, final long version) throws NiFiClientException, IOException {
+ return endTroubleshooting(connectorId, clientId, version, false);
+ }
+
+ private ConnectorEntity endTroubleshooting(final String connectorId, final String clientId, final long version,
+ final Boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(connectorId)) {
+ throw new IllegalArgumentException("Connector ID cannot be null or blank");
+ }
+
+ return executeAction("Error ending Troubleshooting mode", () -> {
+ WebTarget target = connectorTarget
+ .path("/troubleshooting")
+ .queryParam("version", version)
+ .queryParam("clientId", clientId)
+ .resolveTemplate("id", connectorId);
+
+ if (disconnectedNodeAcknowledged == Boolean.TRUE) {
+ target = target.queryParam("disconnectedNodeAcknowledged", "true");
+ }
+
+ return getRequestBuilder(target).delete(ConnectorEntity.class);
+ });
+ }
+
private ConnectorEntity updateConnectorRunStatus(final String connectorId, final String desiredState, final String clientId,
final long version, final Boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException {
if (StringUtils.isBlank(connectorId)) {
diff --git a/pom.xml b/pom.xml
index af98cd0c526d..87eadc2567cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,7 +118,7 @@
v24.14.1
- 2.8.0
+ 2.9.0-SNAPSHOT
2.3.0