Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

/**
* Implementation of the Fluss {@link FileSystem} interface for S3. This class implements the common
* behavior implemented directly by Fluss and delegates common calls to an implementation of
Expand Down Expand Up @@ -52,7 +54,7 @@ public S3FileSystem(
}

@Override
public ObtainedSecurityToken obtainSecurityToken() {
public ObtainedSecurityToken obtainSecurityToken() throws IOException {
if (s3DelegationTokenProvider == null) {
synchronized (this) {
if (s3DelegationTokenProvider == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FileSystemPlugin;
import org.apache.fluss.fs.s3.token.S3ADelegationTokenReceiver;
import org.apache.fluss.fs.s3.token.S3DelegationTokenProvider;
import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Objects;

import static org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
Expand All @@ -42,6 +45,10 @@ public class S3FileSystemPlugin implements FileSystemPlugin {

private static final String[] FLUSS_CONFIG_PREFIXES = {"s3.", "s3a.", "fs.s3a."};

private static final String[] CREDENTIAL_PROVIDER_CONFIG_KEYS = {
"s3.aws.credentials.provider", "s3a.aws.credentials.provider", PROVIDER_CONFIG_NAME
};

private static final String HADOOP_CONFIG_PREFIX = "fs.s3a.";

private static final String ACCESS_KEY_ID = "fs.s3a.access.key";
Expand Down Expand Up @@ -74,6 +81,12 @@ public FileSystem create(URI fsUri, Configuration flussConfig) throws IOExceptio
org.apache.hadoop.conf.Configuration buildHadoopConfiguration(Configuration flussConfig) {
org.apache.hadoop.conf.Configuration hadoopConfig =
mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));
boolean hasCredentialProvider = hasConfiguredCredentialProvider(flussConfig);
// Preserve whether the provider came from Fluss config. Token providers should not infer
// explicit server-side provider mode from Hadoop default resources.
hadoopConfig.setBoolean(
S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED,
hasCredentialProvider);
setCredentialProvider(hadoopConfig);
return hadoopConfig;
}
Expand Down Expand Up @@ -130,27 +143,55 @@ private URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopCon
}

private void setCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) {
boolean hasCredentialProvider =
hadoopConfig.getBoolean(
S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, false);
boolean hasStaticKeys =
hadoopConfig.get(ACCESS_KEY_ID) != null
&& hadoopConfig.get(ACCESS_KEY_SECRET) != null;
boolean hasRoleArn = hadoopConfig.get(ROLE_ARN_KEY) != null;

if (hasCredentialProvider) {
if (hasRoleArn) {
throw new IllegalArgumentException(
"AssumeRole and a custom AWS credentials provider cannot be configured together.");
}
LOG.info(
"Using configured AWS credential provider(s) for server-side S3 access: {}",
hadoopConfig.get(PROVIDER_CONFIG_NAME));
return;
}

if (hasStaticKeys || hasRoleArn) {
LOG.info(
hasStaticKeys
? "Using provided static credentials."
: "Using default AWS credential chain with AssumeRole.");
return;
}

if (Objects.equals(getScheme(), "s3")) {
S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
} else if (Objects.equals(getScheme(), "s3a")) {
S3ADelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
} else {
if (Objects.equals(getScheme(), "s3")) {
S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
} else if (Objects.equals(getScheme(), "s3a")) {
S3ADelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
} else {
throw new IllegalArgumentException("Unsupported scheme: " + getScheme());
throw new IllegalArgumentException("Unsupported scheme: " + getScheme());
}
LOG.info(
"Using credential provider {} for delegated tokens.",
hadoopConfig.get(PROVIDER_CONFIG_NAME));
}

private boolean hasConfiguredCredentialProvider(Configuration flussConfig) {
if (flussConfig == null) {
return false;
}
Map<String, String> configMap = flussConfig.toMap();
for (String key : CREDENTIAL_PROVIDER_CONFIG_KEYS) {
if (StringUtils.isNotBlank(configMap.get(key))) {
return true;
}
LOG.info(
"Using credential provider {} for delegated tokens.",
hadoopConfig.get(PROVIDER_CONFIG_NAME));
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.fluss.fs.token.CredentialsJsonSerde;
import org.apache.fluss.fs.token.ObtainedSecurityToken;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
Expand All @@ -29,12 +32,16 @@
import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -49,6 +56,9 @@ public class S3DelegationTokenProvider {

private static final String ACCESS_KEY_ID = "fs.s3a.access.key";
private static final String ACCESS_KEY_SECRET = "fs.s3a.secret.key";
private static final String AWS_CREDENTIALS_PROVIDER = "fs.s3a.aws.credentials.provider";
public static final String CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED =
"fluss.fs.s3.aws.credentials.provider.explicitly.configured";

private static final String REGION_KEY = "fs.s3a.region";
private static final String ENDPOINT_KEY = "fs.s3a.endpoint";
Expand All @@ -63,21 +73,31 @@ public class S3DelegationTokenProvider {
@Nullable private final String secretKey;
@Nullable private final String roleArn;
@Nullable private final String stsEndpoint;
@Nullable private final AWSCredentialProviderList credentialProviderList;
private final Map<String, String> additionInfos;

public S3DelegationTokenProvider(String scheme, Configuration conf) {
public S3DelegationTokenProvider(String scheme, Configuration conf) throws IOException {
this.scheme = scheme;
this.region = conf.get(REGION_KEY);
checkArgument(region != null, "Region is not set.");
this.accessKey = conf.get(ACCESS_KEY_ID);
this.secretKey = conf.get(ACCESS_KEY_SECRET);
this.roleArn = conf.get(ROLE_ARN_KEY);
this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);
boolean hasCredentialProvider =
conf.getBoolean(CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED, false)
&& StringUtils.isNotBlank(conf.getTrimmed(AWS_CREDENTIALS_PROVIDER));

checkArgument(
(accessKey == null) == (secretKey == null),
"S3 access key and secret key must both be set or both be unset.");
if (accessKey == null) {
if (hasCredentialProvider && roleArn != null) {
throw new IllegalArgumentException(
"AssumeRole and a custom AWS credentials provider cannot be configured together.");
}
this.credentialProviderList =
hasCredentialProvider ? S3AUtils.createAWSCredentialProviderSet(null, conf) : null;
if (accessKey == null && credentialProviderList == null) {
checkArgument(
roleArn != null,
"Role ARN must be set when static credentials are not provided.");
Expand Down Expand Up @@ -106,8 +126,10 @@ public ObtainedSecurityToken obtainSecurityToken() {
credentials = result.getCredentials();
} else {
LOG.info(
"Obtaining session credentials via GetSessionToken with access key: {}",
S3TokenLogUtils.maskAccessKey(accessKey));
"Obtaining session credentials via GetSessionToken{}.",
credentialProviderList != null
? " with configured AWS credentials provider"
: " with access key: " + S3TokenLogUtils.maskAccessKey(accessKey));
GetSessionTokenResult result = stsClient.getSessionToken();
credentials = result.getCredentials();
}
Expand All @@ -131,10 +153,9 @@ private AWSSecurityTokenService buildStsClient() {
AWSSecurityTokenServiceClientBuilder builder =
AWSSecurityTokenServiceClientBuilder.standard();

if (accessKey != null && secretKey != null) {
builder.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(accessKey, secretKey)));
AWSCredentialsProvider stsCredentialsProvider = createStsCredentialsProvider();
if (stsCredentialsProvider != null) {
builder.withCredentials(stsCredentialsProvider);
}

if (stsEndpoint != null) {
Expand All @@ -147,6 +168,33 @@ private AWSSecurityTokenService buildStsClient() {
return builder.build();
}

@Nullable
AWSCredentialsProvider createStsCredentialsProvider() {
if (credentialProviderList != null) {
AWSCredentials credentials = credentialProviderList.getCredentials();
checkArgument(
!(credentials instanceof AWSSessionCredentials),
"Session credentials from the configured AWS credentials provider are not supported "
+ "for Fluss S3 client-token generation.");
checkArgument(
credentials.getAWSAccessKeyId() != null
&& credentials.getAWSSecretKey() != null,
"The configured AWS credentials provider must return an access key and secret key.");
LOG.info(
"Using configured AWS credentials provider for STS GetSessionToken with access key: {}",
S3TokenLogUtils.maskAccessKey(credentials.getAWSAccessKeyId()));
return new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
credentials.getAWSAccessKeyId(), credentials.getAWSSecretKey()));
}

if (accessKey != null && secretKey != null) {
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey));
}

return null;
}

private byte[] toJson(Credentials credentials) {
org.apache.fluss.fs.token.Credentials flussCredentials =
new org.apache.fluss.fs.token.Credentials(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.fluss.config.Configuration;
import org.apache.fluss.fs.s3.token.DynamicTemporaryAWSCredentialsProvider;
import org.apache.fluss.fs.s3.token.S3DelegationTokenProvider;
import org.apache.fluss.fs.s3.token.S3DelegationTokenProviderTest;
import org.apache.fluss.fs.s3.token.S3DelegationTokenReceiver;
import org.apache.fluss.fs.token.Credentials;
import org.apache.fluss.fs.token.CredentialsJsonSerde;
Expand All @@ -29,6 +31,7 @@
import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for server/client detection in {@link S3FileSystemPlugin}. */
class S3FileSystemPluginTest {
Expand Down Expand Up @@ -63,6 +66,66 @@ void testServerModeWithRoleArnOnly() {
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
}

@Test
void testServerModeWithConfiguredCredentialProvider() {
Configuration flussConfig = new Configuration();
flussConfig.setString(
PROVIDER_CONFIG,
S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class.getName());

S3FileSystemPlugin plugin = new S3FileSystemPlugin();
org.apache.hadoop.conf.Configuration hadoopConfig =
plugin.buildHadoopConfiguration(flussConfig);

String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
assertThat(providers)
.isEqualTo(
S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class
.getName());
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
assertThat(
hadoopConfig.getBoolean(
S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED,
false))
.isTrue();
}

@Test
void testServerModeWithConfiguredCredentialProviderForS3A() {
Configuration flussConfig = new Configuration();
flussConfig.setString(
PROVIDER_CONFIG,
S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class.getName());

S3AFileSystemPlugin plugin = new S3AFileSystemPlugin();
org.apache.hadoop.conf.Configuration hadoopConfig =
plugin.buildHadoopConfiguration(flussConfig);

String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
assertThat(providers)
.isEqualTo(
S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class
.getName());
assertThat(providers).doesNotContain(DynamicTemporaryAWSCredentialsProvider.NAME);
}

@Test
void testConfiguredCredentialProviderWithRoleArnThrows() {
Configuration flussConfig = new Configuration();
flussConfig.setString(
PROVIDER_CONFIG,
S3DelegationTokenProviderTest.RefreshableCredentialsProvider.class.getName());
flussConfig.setString(
"fs.s3a.assumed.role.arn", "arn:aws:iam::123456789012:role/test-role");

S3FileSystemPlugin plugin = new S3FileSystemPlugin();

assertThatThrownBy(() -> plugin.buildHadoopConfiguration(flussConfig))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("AssumeRole")
.hasMessageContaining("custom AWS credentials provider");
}

@Test
void testClientModeWithDelegatedCredentials() {
// Pre-populate receiver so updateHadoopConfig does not throw.
Expand All @@ -84,5 +147,10 @@ void testClientModeWithDelegatedCredentials() {

String providers = hadoopConfig.get(PROVIDER_CONFIG, "");
assertThat(providers).contains(DynamicTemporaryAWSCredentialsProvider.NAME);
assertThat(
hadoopConfig.getBoolean(
S3DelegationTokenProvider.CREDENTIAL_PROVIDER_EXPLICITLY_CONFIGURED,
false))
.isFalse();
}
}
Loading
Loading