Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,25 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();

public static final AllowableValue CONFLICT_RESOLUTION_FAIL = new AllowableValue("Fail", "Fail",
"When the destination file already exists, leave the source file in place and log a warning.");
public static final AllowableValue CONFLICT_RESOLUTION_REPLACE = new AllowableValue("Replace", "Replace",
"When the destination file already exists, delete it and rename the source file to the destination. "
+ "Use this option for SFTP servers (e.g. OpenSSH) that do not support atomic overwrite on rename.");
public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Move Conflict Resolution")
.description(String.format("Specifies how to handle the case when the destination file already exists "
+ "when '%s' is '%s'. '%s' leaves the source file in place and logs a warning. "
+ "'%s' explicitly deletes the destination and retries the rename.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(),
CONFLICT_RESOLUTION_FAIL.getDisplayName(), CONFLICT_RESOLUTION_REPLACE.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(CONFLICT_RESOLUTION_FAIL, CONFLICT_RESOLUTION_REPLACE)
.defaultValue(CONFLICT_RESOLUTION_FAIL.getValue())
.dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE)
.required(true)
.build();

public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Log Level When File Not Found")
.description("Log level to use in case the file does not exist when the processor is triggered")
Expand Down Expand Up @@ -354,6 +373,7 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");
final String conflictResolution = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue();

try {
final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
Expand All @@ -363,6 +383,14 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename);
if (CONFLICT_RESOLUTION_REPLACE.getValue().equalsIgnoreCase(conflictResolution)) {
final FileInfo existingDestination = transfer.getRemoteFileInfo(flowFile, absoluteTargetDirPath, simpleFilename);
if (existingDestination != null) {
getLogger().debug("Destination [{}] already exists; removing before rename per conflict resolution Replace", destinationPath);
transfer.deleteFile(flowFile, null, destinationPath);
}
}

transfer.rename(flowFile, filename, destinationPath);

} catch (final IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class FetchSFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
DISABLE_DIRECTORY_LISTING,
SFTPTransfer.CONNECTION_TIMEOUT,
SFTPTransfer.DATA_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;

import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer;
import org.apache.nifi.processors.standard.util.SFTPTransfer;
import org.apache.nifi.processors.standard.util.SSHTestServer;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class TestFetchSFTP {

private static final String SOURCE_FILENAME = "test.txt";
private static final String SOURCE_CONTENTS = "source content";
private static final String DESTINATION_DIR = "/completed";

private SSHTestServer sshTestServer;
private TestRunner runner;
private Path serverRootPath;

@BeforeEach
void setup(@TempDir final Path rootPath) throws IOException {
sshTestServer = new SSHTestServer(rootPath);
sshTestServer.startServer();
serverRootPath = rootPath;

runner = TestRunners.newTestRunner(FetchSFTP.class);
runner.setProperty(FetchFileTransfer.HOSTNAME, sshTestServer.getHost());
runner.setProperty(FetchFileTransfer.UNDEFAULTED_PORT, Integer.toString(sshTestServer.getSSHPort()));
runner.setProperty(SFTPTransfer.USERNAME, sshTestServer.getUsername());
runner.setProperty(SFTPTransfer.PASSWORD, sshTestServer.getPassword());
runner.setProperty(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT, Boolean.FALSE.toString());
runner.setProperty(SFTPTransfer.STRICT_HOST_KEY_CHECKING, Boolean.FALSE.toString());
runner.setProperty(SFTPTransfer.DATA_TIMEOUT, "30 sec");
}

@AfterEach
void stopServer() throws IOException {
sshTestServer.stopServer();
}

@Test
void testMoveCompletionStrategyCleansUpSourceFile() throws IOException {
final Path sourceDir = Files.createDirectory(serverRootPath.resolve("source"));
final Path sourceFile = sourceDir.resolve(SOURCE_FILENAME);
Files.writeString(sourceFile, SOURCE_CONTENTS, StandardCharsets.UTF_8);

final Path completedDir = Files.createDirectory(serverRootPath.resolve("completed"));

runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "/source/" + SOURCE_FILENAME);
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, DESTINATION_DIR);
runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, Boolean.FALSE.toString());

runner.enqueue(new byte[0]);
runner.run();

runner.assertTransferCount(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(Files.exists(sourceFile), "Source file should have been moved");
assertTrue(Files.exists(completedDir.resolve(SOURCE_FILENAME)), "File should exist in destination directory");
}

@Test
void testMoveCompletionStrategyOverwritesExistingDestinationFile() throws IOException {
// When conflict resolution is set to Replace, FetchSFTP should proactively delete the destination
// before renaming. This mirrors the behaviour needed for SFTP servers (e.g. OpenSSH) that return
// SSH_FX_FAILURE on rename when the destination already exists.
final Path sourceDir = Files.createDirectory(serverRootPath.resolve("source"));
final Path sourceFile = sourceDir.resolve(SOURCE_FILENAME);
Files.writeString(sourceFile, SOURCE_CONTENTS, StandardCharsets.UTF_8);

final Path completedDir = Files.createDirectory(serverRootPath.resolve("completed"));
final Path existingDestination = completedDir.resolve(SOURCE_FILENAME);
Files.writeString(existingDestination, "old content", StandardCharsets.UTF_8);

runner.setProperty(FetchFileTransfer.REMOTE_FILENAME, "/source/" + SOURCE_FILENAME);
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, DESTINATION_DIR);
runner.setProperty(FetchFileTransfer.MOVE_CREATE_DIRECTORY, Boolean.FALSE.toString());
runner.setProperty(FetchFileTransfer.MOVE_CONFLICT_RESOLUTION, FetchFileTransfer.CONFLICT_RESOLUTION_REPLACE.getValue());

runner.enqueue(new byte[0]);
runner.run();

runner.assertTransferCount(FetchFileTransfer.REL_SUCCESS, 1);
assertFalse(Files.exists(sourceFile), "Source file should have been moved");
assertTrue(Files.exists(existingDestination), "Destination file should exist after move");
}
}
Loading