Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions packages/google-auth/google/auth/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,13 @@ def get_aws_security_credentials(self, context, request):
)

imdsv2_session_token = self._get_imdsv2_session_token(request)
role_name = self._get_metadata_role_name(request, imdsv2_session_token)

# In Fargate environments, the metadata endpoint doesn't use role
# names in the URL structure, so we skip the role name lookup.
if self._is_fargate_environment():
role_name = None
else:
role_name = self._get_metadata_role_name(request, imdsv2_session_token)

# Get security credentials.
credentials = self._get_metadata_security_credentials(
Expand Down Expand Up @@ -535,8 +541,15 @@ def _get_metadata_security_credentials(
else:
headers = None

# In Fargate environments, use the security credentials URL directly
# without appending the role name.
if role_name is not None:
url = "{}/{}".format(self._security_credentials_url, role_name)
else:
url = self._security_credentials_url

response = request(
url="{}/{}".format(self._security_credentials_url, role_name),
url=url,
method="GET",
headers=headers,
)
Expand Down Expand Up @@ -603,6 +616,23 @@ def _get_metadata_role_name(self, request, imdsv2_session_token):

return response_body

@staticmethod
def _is_fargate_environment():
"""Checks if the current environment is an AWS Fargate container.

Fargate containers expose credentials through a different metadata
endpoint structure that doesn't use role names in the URL. This is
detected via ECS-specific environment variables.

Returns:
bool: True if running in a Fargate environment.
"""
return bool(
os.environ.get("ECS_CONTAINER_METADATA_URI_V4")
or os.environ.get("ECS_CONTAINER_METADATA_URI")
or "AWS_ECS_FARGATE" in os.environ.get("AWS_EXECUTION_ENV", "")
)


class Credentials(external_account.Credentials):
"""AWS external account credentials.
Expand Down
158 changes: 158 additions & 0 deletions packages/google-auth/tests/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -2456,3 +2456,161 @@ def test_refresh_success_with_supplier(self, utcnow, mock_auth_lib_value):
assert credentials.quota_project_id == QUOTA_PROJECT_ID
assert credentials.scopes == SCOPES
assert credentials.default_scopes == ["ignored"]

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ, {"ECS_CONTAINER_METADATA_URI_V4": "http://169.254.170.2/v4"}
)
def test_retrieve_subject_token_success_fargate_env_v4(self, utcnow):
"""In Fargate (detected via ECS_CONTAINER_METADATA_URI_V4), the role
name lookup should be skipped and the security credentials URL should
be called directly without appending a role name."""
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
# No role_status/role_name — Fargate skips the role lookup entirely.
request = self.make_mock_request(
region_status=http_client.OK,
region_name=self.AWS_REGION,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
)
credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

subject_token = credentials.retrieve_subject_token(request)

assert subject_token == self.make_serialized_aws_signed_request(
aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# Only 2 metadata requests: region + security credentials (no role).
assert len(request.call_args_list) == 2
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1], REGION_URL
)
# Security credentials URL called directly, without /role_name suffix.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
SECURITY_CREDS_URL,
None,
)

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(
os.environ, {"ECS_CONTAINER_METADATA_URI": "http://169.254.170.2/v3"}
)
def test_retrieve_subject_token_success_fargate_env_v3(self, utcnow):
"""Fargate detected via ECS_CONTAINER_METADATA_URI (v3 endpoint)."""
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
region_status=http_client.OK,
region_name=self.AWS_REGION,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
)
credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

subject_token = credentials.retrieve_subject_token(request)

assert subject_token == self.make_serialized_aws_signed_request(
aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
assert len(request.call_args_list) == 2

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(os.environ, {"AWS_EXECUTION_ENV": "AWS_ECS_FARGATE"})
def test_retrieve_subject_token_success_fargate_execution_env(self, utcnow):
"""Fargate detected via AWS_EXECUTION_ENV containing AWS_ECS_FARGATE."""
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
region_status=http_client.OK,
region_name=self.AWS_REGION,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
)
credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

subject_token = credentials.retrieve_subject_token(request)

assert subject_token == self.make_serialized_aws_signed_request(
aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
assert len(request.call_args_list) == 2

@mock.patch("google.auth._helpers.utcnow")
@mock.patch.dict(os.environ, {}, clear=False)
def test_retrieve_subject_token_non_fargate_still_uses_role_name(self, utcnow):
"""When no Fargate env vars are set, the EC2 path with role name lookup
should be preserved (3 metadata requests)."""
# Remove any Fargate env vars that might leak from the test environment.
for key in (
"ECS_CONTAINER_METADATA_URI_V4",
"ECS_CONTAINER_METADATA_URI",
"AWS_EXECUTION_ENV",
):
os.environ.pop(key, None)

utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
region_status=http_client.OK,
region_name=self.AWS_REGION,
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
)
credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)

subject_token = credentials.retrieve_subject_token(request)

assert subject_token == self.make_serialized_aws_signed_request(
aws.AwsSecurityCredentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, TOKEN)
)
# EC2 path: region, role name, security credentials = 3 requests.
assert len(request.call_args_list) == 3
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
None,
)

def test_is_fargate_environment_detection(self):
"""Unit test for _is_fargate_environment on the supplier directly."""
supplier = aws._DefaultAwsSecurityCredentialsSupplier(self.CREDENTIAL_SOURCE)

# No Fargate env vars set.
with mock.patch.dict(os.environ, {}, clear=True):
assert supplier._is_fargate_environment() is False

# ECS_CONTAINER_METADATA_URI_V4 set.
with mock.patch.dict(
os.environ,
{"ECS_CONTAINER_METADATA_URI_V4": "http://169.254.170.2/v4"},
clear=True,
):
assert supplier._is_fargate_environment() is True

# ECS_CONTAINER_METADATA_URI set.
with mock.patch.dict(
os.environ,
{"ECS_CONTAINER_METADATA_URI": "http://169.254.170.2/v3"},
clear=True,
):
assert supplier._is_fargate_environment() is True

# AWS_EXECUTION_ENV contains AWS_ECS_FARGATE.
with mock.patch.dict(
os.environ, {"AWS_EXECUTION_ENV": "AWS_ECS_FARGATE"}, clear=True
):
assert supplier._is_fargate_environment() is True

# AWS_EXECUTION_ENV set but not Fargate.
with mock.patch.dict(
os.environ, {"AWS_EXECUTION_ENV": "AWS_ECS_EC2"}, clear=True
):
assert supplier._is_fargate_environment() is False
Loading