diff --git a/sdk/spring/CHANGELOG.md b/sdk/spring/CHANGELOG.md index 41ad0d1cc57d..b53a1a66f288 100644 --- a/sdk/spring/CHANGELOG.md +++ b/sdk/spring/CHANGELOG.md @@ -26,6 +26,18 @@ This section includes changes in `spring-cloud-azure-stream-binder-servicebus` m - Add support for injecting a custom `RetryTemplate` from Spring context for advanced retry scenarios. [#47135](https://github.com/Azure/azure-sdk-for-java/issues/47135). +### Spring Cloud Azure Service + +This section includes changes in `spring-cloud-azure-service` module. + +#### Features Added + +- Support `AzurePipelinesCredential` in Azure Event Hubs for Kafka passwordless connection ([#49108](https://github.com/Azure/azure-sdk-for-java/pull/49108)). It only takes effect when all the following 4 environment variables exist at runtime: + - `AZURESUBSCRIPTION_SERVICE_CONNECTION_ID` + - `AZURESUBSCRIPTION_CLIENT_ID` + - `AZURESUBSCRIPTION_TENANT_ID` + - `SYSTEM_ACCESSTOKEN` + ## 6.3.0 (2026-04-29) - This release is compatible with Spring Boot 3.5.0-3.5.14. (Note: 3.5.x (x>14) should be supported, but they aren't tested with this release.) - This release is compatible with Spring Cloud 2025.0.0-2025.0.2. (Note: 2025.0.x (x>2) should be supported, but they aren't tested with this release.) diff --git a/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderOAuthIT.java b/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderOAuthIT.java index 0654af2461b7..cfc3aaa4bc09 100644 --- a/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderOAuthIT.java +++ b/sdk/spring/spring-cloud-azure-integration-tests/src/test/java/com/azure/spring/cloud/integration/tests/eventhubs/binder/EventHubsKafkaBinderOAuthIT.java @@ -5,7 +5,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +28,6 @@ @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) @ActiveProfiles("eventhubs-kafka-binder-oauth") -@Disabled("Pipeline oauth is not enabled now") class EventHubsKafkaBinderOAuthIT { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsKafkaBinderOAuthIT.class); diff --git a/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandler.java b/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandler.java index 3f325e33a862..72db1274eb32 100644 --- a/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandler.java +++ b/sdk/spring/spring-cloud-azure-service/src/main/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandler.java @@ -4,6 +4,10 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.credential.TokenRequestContext; +import com.azure.core.util.Configuration; +import com.azure.core.util.logging.ClientLogger; +import com.azure.identity.AzurePipelinesCredentialBuilder; +import com.azure.identity.ChainedTokenCredentialBuilder; import com.azure.spring.cloud.core.credential.AzureCredentialResolver; import com.azure.spring.cloud.core.implementation.credential.resolver.AzureTokenCredentialResolver; import com.azure.spring.cloud.core.implementation.factory.credential.DefaultAzureCredentialBuilderFactory; @@ -111,6 +115,8 @@ public void close() { } private static class InternalCredentialResolver implements AzureCredentialResolver { + private static final ClientLogger LOGGER = new ClientLogger(InternalCredentialResolver.class); + private final AzureCredentialResolver delegated; private final Map configs; private TokenCredential credential; @@ -128,8 +134,16 @@ public TokenCredential resolve(AzureProperties properties) { if (credential == null) { credential = delegated.resolve(properties); if (credential == null) { - // Create DefaultAzureCredential when no credential can be resolved from configs. - credential = new DefaultAzureCredentialBuilderFactory(properties).build().build(); + TokenCredential defaultAzureCredential = new DefaultAzureCredentialBuilderFactory(properties).build().build(); + TokenCredential pipelinesCredential = tryBuildAzurePipelinesCredential(properties); + if (pipelinesCredential == null) { + credential = defaultAzureCredential; + } else { + credential = new ChainedTokenCredentialBuilder() + .addLast(pipelinesCredential) + .addLast(defaultAzureCredential) + .build(); + } } } } @@ -140,5 +154,55 @@ public TokenCredential resolve(AzureProperties properties) { public boolean isResolvable(AzureProperties properties) { return true; } + + /** + * Attempts to build an {@code AzurePipelinesCredential} from the Azure DevOps federated + * workload-identity environment variables. Returns {@code null} when any of the four + * caller-provided variables ({@code AZURESUBSCRIPTION_SERVICE_CONNECTION_ID}, + * {@code AZURESUBSCRIPTION_CLIENT_ID}, {@code AZURESUBSCRIPTION_TENANT_ID}, + * {@code SYSTEM_ACCESSTOKEN}) are missing, or when {@code AzurePipelinesCredentialBuilder#build()} + * itself fails (e.g. {@code SYSTEM_OIDCREQUESTURI} is unavailable outside an Azure DevOps + * job). The authority host is taken from the {@link AzureProperties} profile so that the + * credential targets the correct cloud (public, China, US Gov). + */ + private static TokenCredential tryBuildAzurePipelinesCredential(AzureProperties properties) { + Configuration config = Configuration.getGlobalConfiguration(); + String serviceConnectionId = config.get("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID"); + String clientId = config.get("AZURESUBSCRIPTION_CLIENT_ID"); + String tenantId = config.get("AZURESUBSCRIPTION_TENANT_ID"); + String systemAccessToken = config.get("SYSTEM_ACCESSTOKEN"); + if (isNullOrEmpty(serviceConnectionId) + || isNullOrEmpty(clientId) + || isNullOrEmpty(tenantId) + || isNullOrEmpty(systemAccessToken)) { + return null; + } + try { + AzurePipelinesCredentialBuilder builder = new AzurePipelinesCredentialBuilder() + .systemAccessToken(systemAccessToken) + .clientId(clientId) + .tenantId(tenantId) + .serviceConnectionId(serviceConnectionId); + String authorityHost = resolveAuthorityHost(properties); + if (!isNullOrEmpty(authorityHost)) { + builder.authorityHost(authorityHost); + } + return builder.build(); + } catch (RuntimeException e) { + LOGGER.verbose("Failed to build AzurePipelinesCredential, will fall back to DefaultAzureCredential.", e); + return null; + } + } + + private static String resolveAuthorityHost(AzureProperties properties) { + if (properties == null || properties.getProfile() == null || properties.getProfile().getEnvironment() == null) { + return null; + } + return properties.getProfile().getEnvironment().getActiveDirectoryEndpoint(); + } + + private static boolean isNullOrEmpty(String value) { + return value == null || value.isEmpty(); + } } } diff --git a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandlerTest.java b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandlerTest.java index 26e4317da6dc..12e23f938aa2 100644 --- a/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandlerTest.java +++ b/sdk/spring/spring-cloud-azure-service/src/test/java/com/azure/spring/cloud/service/implementation/kafka/KafkaOAuth2AuthenticateCallbackHandlerTest.java @@ -5,6 +5,8 @@ import com.azure.core.credential.AccessToken; import com.azure.core.credential.TokenCredential; import com.azure.core.credential.TokenRequestContext; +import com.azure.core.util.Configuration; +import com.azure.identity.ChainedTokenCredential; import com.azure.identity.DefaultAzureCredential; import com.azure.identity.ManagedIdentityCredential; import com.azure.spring.cloud.core.credential.AzureCredentialResolver; @@ -14,6 +16,8 @@ import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; +import org.junit.jupiter.api.parallel.Resources; import org.mockito.Mockito; import org.springframework.test.util.ReflectionTestUtils; import reactor.core.publisher.Mono; @@ -82,6 +86,41 @@ void testCreateDefaultTokenCredential() { assertTrue(azureTokenCredentialResolver.resolve(properties) instanceof DefaultAzureCredential); } + @Test + @ResourceLock(value = Resources.GLOBAL, mode = org.junit.jupiter.api.parallel.ResourceAccessMode.READ_WRITE) + @SuppressWarnings("deprecation") + void testCreateChainedTokenCredentialWhenAzurePipelinesEnvVarsPresent() { + // Use Configuration.put rather than System.setProperty because Configuration internally + // caches system-property/env-var lookups; only the explicitConfigurations map (populated + // by put) is cleared by remove, giving us proper per-test isolation. + Configuration globalConfiguration = Configuration.getGlobalConfiguration(); + globalConfiguration.put("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID", "test-service-connection-id"); + globalConfiguration.put("AZURESUBSCRIPTION_CLIENT_ID", "00000000-0000-0000-0000-000000000000"); + globalConfiguration.put("AZURESUBSCRIPTION_TENANT_ID", "11111111-1111-1111-1111-111111111111"); + globalConfiguration.put("SYSTEM_ACCESSTOKEN", "test-system-access-token"); + // AzurePipelinesCredentialBuilder.build() also validates SYSTEM_OIDCREQUESTURI. + globalConfiguration.put("SYSTEM_OIDCREQUESTURI", "https://example.test/oidc"); + try { + Map configs = new HashMap<>(); + configs.put(BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER); + KafkaOAuth2AuthenticateCallbackHandler handler = new KafkaOAuth2AuthenticateCallbackHandler(); + handler.configure(configs, null, null); + + AzurePasswordlessProperties properties = (AzurePasswordlessProperties) ReflectionTestUtils + .getField(handler, AZURE_THIRD_PARTY_SERVICE_PROPERTIES_FIELD_NAME); + @SuppressWarnings("unchecked") AzureCredentialResolver azureTokenCredentialResolver = + (AzureCredentialResolver) ReflectionTestUtils.getField(handler, TOKEN_CREDENTIAL_RESOLVER_FIELD_NAME); + assertNotNull(azureTokenCredentialResolver); + assertTrue(azureTokenCredentialResolver.resolve(properties) instanceof ChainedTokenCredential); + } finally { + globalConfiguration.remove("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID"); + globalConfiguration.remove("AZURESUBSCRIPTION_CLIENT_ID"); + globalConfiguration.remove("AZURESUBSCRIPTION_TENANT_ID"); + globalConfiguration.remove("SYSTEM_ACCESSTOKEN"); + globalConfiguration.remove("SYSTEM_OIDCREQUESTURI"); + } + } + @Test void testCreateTokenCredentialByResolver() { Map configs = new HashMap<>();