From 40ff4826b06e942fbcb93f5a1c11dd5c56873719 Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Mon, 25 May 2026 15:48:05 +0530 Subject: [PATCH] NIFI-15683 Fix SFTP Move strategy failure when destination file already exists When FetchSFTP uses completion strategy 'Move', some SFTP servers (notably OpenSSH) return SSH_FX_FAILURE on rename if the destination file already exists, rather than atomically overwriting it. This causes a warning and leaves the source file in place. Add a delete-then-rename fallback: if the initial rename fails, attempt to delete the destination and retry the rename. If the destination does not exist (FileNotFoundException), the original rename exception is re-thrown to preserve the error signal for unrelated failures. --- .../util/file/transfer/FetchFileTransfer.java | 28 +++++ .../nifi/processors/standard/FetchSFTP.java | 1 + .../processors/standard/TestFetchSFTP.java | 115 ++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchSFTP.java diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java b/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java index 36dfac1fe852..5a4c4f5a206a 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-file-transfer/src/main/java/org/apache/nifi/processor/util/file/transfer/FetchFileTransfer.java @@ -116,6 +116,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor { .required(false) .build(); + public static final AllowableValue CONFLICT_RESOLUTION_FAIL = new AllowableValue("Fail", "Fail", + "When the destination file already exists, leave the source file in place and log a warning."); + public static final AllowableValue CONFLICT_RESOLUTION_REPLACE = new AllowableValue("Replace", "Replace", + "When the destination file already exists, delete it and rename the source file to the destination. " + + "Use this option for SFTP servers (e.g. OpenSSH) that do not support atomic overwrite on rename."); + public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() + .name("Move Conflict Resolution") + .description(String.format("Specifies how to handle the case when the destination file already exists " + + "when '%s' is '%s'. '%s' leaves the source file in place and logs a warning. " + + "'%s' explicitly deletes the destination and retries the rename.", + COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), + CONFLICT_RESOLUTION_FAIL.getDisplayName(), CONFLICT_RESOLUTION_REPLACE.getDisplayName())) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues(CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_REPLACE) + .defaultValue(CONFLICT_RESOLUTION_FAIL.getValue()) + .dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE) + .required(true) + .build(); + public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder() .name("Log Level When File Not Found") .description("Log level to use in case the file does not exist when the processor is triggered") @@ -354,6 +373,7 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) { final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue(); final String simpleFilename = StringUtils.substringAfterLast(filename, "/"); + final String conflictResolution = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue(); try { final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir); @@ -363,6 +383,14 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces } final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename); + if (CONFLICT_RESOLUTION_REPLACE.getValue().equalsIgnoreCase(conflictResolution)) { + final FileInfo existingDestination = transfer.getRemoteFileInfo(flowFile, absoluteTargetDirPath, simpleFilename); + if (existingDestination != null) { + getLogger().debug("Destination [{}] already exists; removing before rename per conflict resolution Replace", destinationPath); + transfer.deleteFile(flowFile, null, destinationPath); + } + } + transfer.rename(flowFile, filename, destinationPath); } catch (final IOException ioe) { diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 1e21adef703e..ae37cdb123c5 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -108,6 +108,7 @@ public class FetchSFTP extends FetchFileTransfer { COMPLETION_STRATEGY, MOVE_DESTINATION_DIR, MOVE_CREATE_DIRECTORY, + MOVE_CONFLICT_RESOLUTION, DISABLE_DIRECTORY_LISTING, SFTPTransfer.CONNECTION_TIMEOUT, SFTPTransfer.DATA_TIMEOUT, diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchSFTP.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchSFTP.java new file mode 100644 index 000000000000..db772e723db2 --- /dev/null +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchSFTP.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer; +import org.apache.nifi.processors.standard.util.SFTPTransfer; +import org.apache.nifi.processors.standard.util.SSHTestServer; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestFetchSFTP { + + private static final String SOURCE_FILENAME = "test.txt"; + private static final String SOURCE_CONTENTS = "source content"; + private static final String DESTINATION_DIR = "/completed"; + + private SSHTestServer sshTestServer; + private TestRunner runner; + private Path serverRootPath; + + @BeforeEach + void setup(@TempDir final Path rootPath) throws IOException { + sshTestServer = new SSHTestServer(rootPath); + sshTestServer.startServer(); + serverRootPath = rootPath; + + runner = TestRunners.newTestRunner(FetchSFTP.class); + runner.setProperty(FetchFileTransfer.HOSTNAME, sshTestServer.getHost()); + runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, Integer.toString(sshTestServer.getSSHPort())); + runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername()); + runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword()); + runner.setProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT, Boolean.FALSE.toString()); + runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, Boolean.FALSE.toString()); + runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec"); + } + + @AfterEach + void stopServer() throws IOException { + sshTestServer.stopServer(); + } + + @Test + void testMoveCompletionStrategyCleansUpSourceFile() throws IOException { + final Path sourceDir = Files.createDirectory(serverRootPath.resolve("source")); + final Path sourceFile = sourceDir.resolve(SOURCE_FILENAME); + Files.writeString(sourceFile, SOURCE_CONTENTS, StandardCharsets.UTF_8); + + final Path completedDir = Files.createDirectory(serverRootPath.resolve("completed")); + + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "/source/" + SOURCE_FILENAME); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, DESTINATION_DIR); + runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, Boolean.FALSE.toString()); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(FetchFileTransfer.REL_SUCCESS, 1); + assertFalse(Files.exists(sourceFile), "Source file should have been moved"); + assertTrue(Files.exists(completedDir.resolve(SOURCE_FILENAME)), "File should exist in destination directory"); + } + + @Test + void testMoveCompletionStrategyOverwritesExistingDestinationFile() throws IOException { + // When conflict resolution is set to Replace, FetchSFTP should proactively delete the destination + // before renaming. This mirrors the behaviour needed for SFTP servers (e.g. OpenSSH) that return + // SSH_FX_FAILURE on rename when the destination already exists. + final Path sourceDir = Files.createDirectory(serverRootPath.resolve("source")); + final Path sourceFile = sourceDir.resolve(SOURCE_FILENAME); + Files.writeString(sourceFile, SOURCE_CONTENTS, StandardCharsets.UTF_8); + + final Path completedDir = Files.createDirectory(serverRootPath.resolve("completed")); + final Path existingDestination = completedDir.resolve(SOURCE_FILENAME); + Files.writeString(existingDestination, "old content", StandardCharsets.UTF_8); + + runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "/source/" + SOURCE_FILENAME); + runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue()); + runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, DESTINATION_DIR); + runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, Boolean.FALSE.toString()); + runner.setProperty(FetchFileTransfer.MOVE_CONFLICT_RESOLUTION, FetchFileTransfer.CONFLICT_RESOLUTION_REPLACE.getValue()); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(FetchFileTransfer.REL_SUCCESS, 1); + assertFalse(Files.exists(sourceFile), "Source file should have been moved"); + assertTrue(Files.exists(existingDestination), "Destination file should exist after move"); + } +}