diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystem.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystem.java index 638663aa38..c79d7b9a9f 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystem.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystem.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; +import java.io.IOException; + /** * Implementation of the Fluss {@link FileSystem} interface for S3. This class implements the common * behavior implemented directly by Fluss and delegates common calls to an implementation of @@ -52,7 +54,7 @@ public S3FileSystem( } @Override - public ObtainedSecurityToken obtainSecurityToken() { + public ObtainedSecurityToken obtainSecurityToken() throws IOException { if (s3DelegationTokenProvider == null) { synchronized (this) { if (s3DelegationTokenProvider == null) { diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java index 92ed4615b3..2e5b58916e 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java @@ -23,14 +23,17 @@ import org.apache.fluss.fs.FileSystem; import org.apache.fluss.fs.FileSystemPlugin; import org.apache.fluss.fs.s3.token.S3ADelegationTokenReceiver; +import org.apache.fluss.fs.s3.token.S3DelegationTokenProvider; import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; +import java.util.Map; import java.util.Objects; import static org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver.PROVIDER_CONFIG_NAME; @@ -42,6 +45,10 @@ public class S3FileSystemPlugin implements FileSystemPlugin { private static final String[] FLUSS_CONFIG_PREFIXES = {"s3.", "s3a.", "fs.s3a."}; + private static final String[] CREDENTIAL_PROVIDER_CONFIG_KEYS = { + "s3.aws.credentials.provider", "s3a.aws.credentials.provider", PROVIDER_CONFIG_NAME + }; + private static final String HADOOP_CONFIG_PREFIX = "fs.s3a."; private static final String ACCESS_KEY_ID = "fs.s3a.access.key"; @@ -74,6 +81,12 @@ public FileSystem create(URI fsUri, Configuration flussConfig) throws IOExceptio org.apache.hadoop.conf.Configuration buildHadoopConfiguration(Configuration flussConfig) { org.apache.hadoop.conf.Configuration hadoopConfig = mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig)); + boolean hasCredentialProvider = hasConfiguredCredentialProvider(flussConfig); + // Preserve whether the provider came from Fluss config. Token providers should not infer + // explicit server-side provider mode from Hadoop default resources. + hadoopConfig.setBoolean( + S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, + hasCredentialProvider); setCredentialProvider(hadoopConfig); return hadoopConfig; } @@ -130,27 +143,55 @@ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopCon } private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) { + boolean hasCredentialProvider = + hadoopConfig.getBoolean( + S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, false); boolean hasStaticKeys = hadoopConfig.get(ACCESS_KEY_ID) != null && hadoopConfig.get(ACCESS_KEY_SECRET) != null; boolean hasRoleArn = hadoopConfig.get(ROLE_ARN_KEY) != null; + if (hasCredentialProvider) { + if (hasRoleArn) { + throw new IllegalArgumentException( + "AssumeRole and a custom AWS credentials provider cannot be configured together."); + } + LOG.info( + "Using configured AWS credential provider(s) for server-side S3 access: {}", + hadoopConfig.get(PROVIDER_CONFIG_NAME)); + return; + } + if (hasStaticKeys || hasRoleArn) { LOG.info( hasStaticKeys ? "Using provided static credentials." : "Using default AWS credential chain with AssumeRole."); + return; + } + + if (Objects.equals(getScheme(), "s3")) { + S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig); + } else if (Objects.equals(getScheme(), "s3a")) { + S3ADelegationTokenReceiver.updateHadoopConfig(hadoopConfig); } else { - if (Objects.equals(getScheme(), "s3")) { - S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig); - } else if (Objects.equals(getScheme(), "s3a")) { - S3ADelegationTokenReceiver.updateHadoopConfig(hadoopConfig); - } else { - throw new IllegalArgumentException("Unsupported scheme: " + getScheme()); + throw new IllegalArgumentException("Unsupported scheme: " + getScheme()); + } + LOG.info( + "Using credential provider {} for delegated tokens.", + hadoopConfig.get(PROVIDER_CONFIG_NAME)); + } + + private boolean hasConfiguredCredentialProvider(Configuration flussConfig) { + if (flussConfig == null) { + return false; + } + Map configMap = flussConfig.toMap(); + for (String key : CREDENTIAL_PROVIDER_CONFIG_KEYS) { + if (StringUtils.isNotBlank(configMap.get(key))) { + return true; } - LOG.info( - "Using credential provider {} for delegated tokens.", - hadoopConfig.get(PROVIDER_CONFIG_NAME)); } + return false; } } diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java index 2a5ee6212a..e495dfdff9 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java @@ -20,6 +20,9 @@ import org.apache.fluss.fs.token.CredentialsJsonSerde; import org.apache.fluss.fs.token.ObtainedSecurityToken; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSSessionCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; @@ -29,12 +32,16 @@ import com.amazonaws.services.securitytoken.model.AssumeRoleResult; import com.amazonaws.services.securitytoken.model.Credentials; import com.amazonaws.services.securitytoken.model.GetSessionTokenResult; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; +import org.apache.hadoop.fs.s3a.S3AUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -49,6 +56,9 @@ public class S3DelegationTokenProvider { private static final String ACCESS_KEY_ID = "fs.s3a.access.key"; private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key"; + private static final String AWS_CREDENTIALS_PROVIDER = "fs.s3a.aws.credentials.provider"; + public static final String CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED = + "fluss.fs.s3.aws.credentials.provider.explicitly.configured"; private static final String REGION_KEY = "fs.s3a.region"; private static final String ENDPOINT_KEY = "fs.s3a.endpoint"; @@ -63,9 +73,10 @@ public class S3DelegationTokenProvider { @Nullable private final String secretKey; @Nullable private final String roleArn; @Nullable private final String stsEndpoint; + @Nullable private final AWSCredentialProviderList credentialProviderList; private final Map additionInfos; - public S3DelegationTokenProvider(String scheme, Configuration conf) { + public S3DelegationTokenProvider(String scheme, Configuration conf) throws IOException { this.scheme = scheme; this.region = conf.get(REGION_KEY); checkArgument(region != null, "Region is not set."); @@ -73,11 +84,20 @@ public S3DelegationTokenProvider(String scheme, Configuration conf) { this.secretKey = conf.get(ACCESS_KEY_SECRET); this.roleArn = conf.get(ROLE_ARN_KEY); this.stsEndpoint = conf.get(STS_ENDPOINT_KEY); + boolean hasCredentialProvider = + conf.getBoolean(CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, false) + && StringUtils.isNotBlank(conf.getTrimmed(AWS_CREDENTIALS_PROVIDER)); checkArgument( (accessKey == null) == (secretKey == null), "S3 access key and secret key must both be set or both be unset."); - if (accessKey == null) { + if (hasCredentialProvider && roleArn != null) { + throw new IllegalArgumentException( + "AssumeRole and a custom AWS credentials provider cannot be configured together."); + } + this.credentialProviderList = + hasCredentialProvider ? S3AUtils.createAWSCredentialProviderSet(null, conf) : null; + if (accessKey == null && credentialProviderList == null) { checkArgument( roleArn != null, "Role ARN must be set when static credentials are not provided."); @@ -106,8 +126,10 @@ public ObtainedSecurityToken obtainSecurityToken() { credentials = result.getCredentials(); } else { LOG.info( - "Obtaining session credentials via GetSessionToken with access key: {}", - S3TokenLogUtils.maskAccessKey(accessKey)); + "Obtaining session credentials via GetSessionToken{}.", + credentialProviderList != null + ? " with configured AWS credentials provider" + : " with access key: " + S3TokenLogUtils.maskAccessKey(accessKey)); GetSessionTokenResult result = stsClient.getSessionToken(); credentials = result.getCredentials(); } @@ -131,10 +153,9 @@ private AWSSecurityTokenService buildStsClient() { AWSSecurityTokenServiceClientBuilder builder = AWSSecurityTokenServiceClientBuilder.standard(); - if (accessKey != null && secretKey != null) { - builder.withCredentials( - new AWSStaticCredentialsProvider( - new BasicAWSCredentials(accessKey, secretKey))); + AWSCredentialsProvider stsCredentialsProvider = createStsCredentialsProvider(); + if (stsCredentialsProvider != null) { + builder.withCredentials(stsCredentialsProvider); } if (stsEndpoint != null) { @@ -147,6 +168,33 @@ private AWSSecurityTokenService buildStsClient() { return builder.build(); } + @Nullable + AWSCredentialsProvider createStsCredentialsProvider() { + if (credentialProviderList != null) { + AWSCredentials credentials = credentialProviderList.getCredentials(); + checkArgument( + !(credentials instanceof AWSSessionCredentials), + "Session credentials from the configured AWS credentials provider are not supported " + + "for Fluss S3 client-token generation."); + checkArgument( + credentials.getAWSAccessKeyId() != null + && credentials.getAWSSecretKey() != null, + "The configured AWS credentials provider must return an access key and secret key."); + LOG.info( + "Using configured AWS credentials provider for STS GetSessionToken with access key: {}", + S3TokenLogUtils.maskAccessKey(credentials.getAWSAccessKeyId())); + return new AWSStaticCredentialsProvider( + new BasicAWSCredentials( + credentials.getAWSAccessKeyId(), credentials.getAWSSecretKey())); + } + + if (accessKey != null && secretKey != null) { + return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); + } + + return null; + } + private byte[] toJson(Credentials credentials) { org.apache.fluss.fs.token.Credentials flussCredentials = new org.apache.fluss.fs.token.Credentials( diff --git a/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java b/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java index 3b5a445a13..d946348551 100644 --- a/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java +++ b/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/S3FileSystemPluginTest.java @@ -19,6 +19,8 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.fs.s3.token.DynamicTemporaryAWSCredentialsProvider; +import org.apache.fluss.fs.s3.token.S3DelegationTokenProvider; +import org.apache.fluss.fs.s3.token.S3DelegationTokenProviderTest; import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver; import org.apache.fluss.fs.token.Credentials; import org.apache.fluss.fs.token.CredentialsJsonSerde; @@ -29,6 +31,7 @@ import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for server/client detection in {@link S3FileSystemPlugin}. */ class S3FileSystemPluginTest { @@ -63,6 +66,66 @@ void testServerModeWithRoleArnOnly() { assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME); } + @Test + void testServerModeWithConfiguredCredentialProvider() { + Configuration flussConfig = new Configuration(); + flussConfig.setString( + PROVIDER_CONFIG, + S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class.getName()); + + S3FileSystemPlugin plugin = new S3FileSystemPlugin(); + org.apache.hadoop.conf.Configuration hadoopConfig = + plugin.buildHadoopConfiguration(flussConfig); + + String providers = hadoopConfig.get(PROVIDER_CONFIG, ""); + assertThat(providers) + .isEqualTo( + S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class + .getName()); + assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME); + assertThat( + hadoopConfig.getBoolean( + S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, + false)) + .isTrue(); + } + + @Test + void testServerModeWithConfiguredCredentialProviderForS3A() { + Configuration flussConfig = new Configuration(); + flussConfig.setString( + PROVIDER_CONFIG, + S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class.getName()); + + S3AFileSystemPlugin plugin = new S3AFileSystemPlugin(); + org.apache.hadoop.conf.Configuration hadoopConfig = + plugin.buildHadoopConfiguration(flussConfig); + + String providers = hadoopConfig.get(PROVIDER_CONFIG, ""); + assertThat(providers) + .isEqualTo( + S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class + .getName()); + assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME); + } + + @Test + void testConfiguredCredentialProviderWithRoleArnThrows() { + Configuration flussConfig = new Configuration(); + flussConfig.setString( + PROVIDER_CONFIG, + S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class.getName()); + flussConfig.setString( + "fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role"); + + S3FileSystemPlugin plugin = new S3FileSystemPlugin(); + + assertThatThrownBy(() -> plugin.buildHadoopConfiguration(flussConfig)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("AssumeRole") + .hasMessageContaining("custom AWS credentials provider"); + } + @Test void testClientModeWithDelegatedCredentials() { // Pre-populate receiver so updateHadoopConfig does not throw. @@ -84,5 +147,10 @@ void testClientModeWithDelegatedCredentials() { String providers = hadoopConfig.get(PROVIDER_CONFIG, ""); assertThat(providers).contains(DynamicTemporaryAWSCredentialsProvider.NAME); + assertThat( + hadoopConfig.getBoolean( + S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, + false)) + .isFalse(); } } diff --git a/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProviderTest.java b/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProviderTest.java index 172dec5b58..eb865f7a43 100644 --- a/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProviderTest.java +++ b/fluss-filesystems/fluss-fs-s3/src/test/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProviderTest.java @@ -17,17 +17,28 @@ package org.apache.fluss.fs.s3.token; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.BasicSessionCredentials; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.net.URI; + +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link S3DelegationTokenProvider} constructor validation. */ -class S3DelegationTokenProviderTest { +public class S3DelegationTokenProviderTest { + + private static final String PROVIDER_CONFIG = "fs.s3a.aws.credentials.provider"; @Test - void testDefaultChainWithRoleArn() { + void testDefaultChainWithRoleArn() throws IOException { Configuration conf = new Configuration(); conf.set("fs.s3a.region", "us-east-1"); conf.set("fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role"); @@ -55,4 +66,154 @@ void testPartialStaticCredentialsThrows() { .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("must both be set or both be unset"); } + + @Test + void testConfiguredProviderWithoutStaticCredentialsIsAccepted() { + Configuration conf = new Configuration(); + conf.set("fs.s3a.region", "us-east-1"); + setConfiguredProvider(conf, RefreshableCredentialsProvider.class); + + assertThatCode(() -> new S3DelegationTokenProvider("s3", conf)).doesNotThrowAnyException(); + } + + @Test + void testConfiguredProviderRequiresRegion() { + Configuration conf = new Configuration(); + setConfiguredProvider(conf, RefreshableCredentialsProvider.class); + + assertThatThrownBy(() -> new S3DelegationTokenProvider("s3", conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Region is not set"); + } + + @Test + void testConfiguredProviderWithRoleArnThrows() { + Configuration conf = new Configuration(); + conf.set("fs.s3a.region", "us-east-1"); + setConfiguredProvider(conf, RefreshableCredentialsProvider.class); + conf.set("fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role"); + + assertThatThrownBy(() -> new S3DelegationTokenProvider("s3", conf)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("AssumeRole") + .hasMessageContaining("custom AWS credentials provider"); + } + + @Test + void testConfiguredProviderCredentialsAreResolvedForEachUse() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.s3a.region", "us-east-1"); + setConfiguredProvider(conf, RefreshableCredentialsProvider.class); + RefreshableCredentialsProvider.setCredentials("firstAccessKey", "firstSecretKey"); + S3DelegationTokenProvider provider = new S3DelegationTokenProvider("s3", conf); + + AWSCredentials firstCredentials = provider.createStsCredentialsProvider().getCredentials(); + RefreshableCredentialsProvider.setCredentials("secondAccessKey", "secondSecretKey"); + AWSCredentials secondCredentials = provider.createStsCredentialsProvider().getCredentials(); + + assertThat(firstCredentials.getAWSAccessKeyId()).isEqualTo("firstAccessKey"); + assertThat(firstCredentials.getAWSSecretKey()).isEqualTo("firstSecretKey"); + assertThat(secondCredentials.getAWSAccessKeyId()).isEqualTo("secondAccessKey"); + assertThat(secondCredentials.getAWSSecretKey()).isEqualTo("secondSecretKey"); + } + + @Test + void testConfiguredProviderTakesPrecedenceOverStaticCredentials() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.s3a.region", "us-east-1"); + conf.set("fs.s3a.access.key", "staticAccessKey"); + conf.set("fs.s3a.secret.key", "staticSecretKey"); + setConfiguredProvider(conf, RefreshableCredentialsProvider.class); + RefreshableCredentialsProvider.setCredentials("providerAccessKey", "providerSecretKey"); + S3DelegationTokenProvider provider = new S3DelegationTokenProvider("s3", conf); + + AWSCredentials credentials = provider.createStsCredentialsProvider().getCredentials(); + + assertThat(credentials.getAWSAccessKeyId()).isEqualTo("providerAccessKey"); + assertThat(credentials.getAWSSecretKey()).isEqualTo("providerSecretKey"); + } + + @Test + void testConfiguredProviderReturningSessionCredentialsThrowsBeforeSts() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.s3a.region", "us-east-1"); + setConfiguredProvider(conf, SessionCredentialsProvider.class); + S3DelegationTokenProvider provider = new S3DelegationTokenProvider("s3", conf); + + assertThatThrownBy(provider::createStsCredentialsProvider) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Session credentials") + .hasMessageNotContaining("sessionSecretKey") + .hasMessageNotContaining("sessionToken"); + } + + @Test + void testConfiguredProviderWithUriConfigurationConstructorIsSupported() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.s3a.region", "us-east-1"); + conf.set("test.config.value", "configured-value"); + setConfiguredProvider(conf, UriConfigurationCredentialsProvider.class); + + new S3DelegationTokenProvider("s3", conf); + + assertThat(UriConfigurationCredentialsProvider.uri).isNull(); + assertThat(UriConfigurationCredentialsProvider.configuredValue) + .isEqualTo("configured-value"); + } + + private static void setConfiguredProvider( + Configuration conf, Class providerClass) { + conf.set(PROVIDER_CONFIG, providerClass.getName()); + conf.setBoolean(S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, true); + } + + /** Refreshable credentials provider for tests. */ + public static class RefreshableCredentialsProvider implements AWSCredentialsProvider { + private static volatile AWSCredentials credentials = + new BasicAWSCredentials("defaultAccessKey", "defaultSecretKey"); + + static void setCredentials(String accessKey, String secretKey) { + credentials = new BasicAWSCredentials(accessKey, secretKey); + } + + @Override + public AWSCredentials getCredentials() { + return credentials; + } + + @Override + public void refresh() {} + } + + /** Session credentials provider for tests. */ + public static class SessionCredentialsProvider implements AWSCredentialsProvider { + + @Override + public AWSSessionCredentials getCredentials() { + return new BasicSessionCredentials( + "sessionAccessKey", "sessionSecretKey", "sessionToken"); + } + + @Override + public void refresh() {} + } + + /** Credentials provider with the Hadoop S3A URI/configuration constructor form. */ + public static class UriConfigurationCredentialsProvider implements AWSCredentialsProvider { + private static volatile URI uri; + private static volatile String configuredValue; + + public UriConfigurationCredentialsProvider(URI uri, Configuration conf) { + UriConfigurationCredentialsProvider.uri = uri; + UriConfigurationCredentialsProvider.configuredValue = conf.get("test.config.value"); + } + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials("uriAccessKey", "uriSecretKey"); + } + + @Override + public void refresh() {} + } }