Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions azure-blob-payloads/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
plugins {
id 'java-library'
id 'maven-publish'
id 'signing'
id 'com.github.spotbugs' version '6.4.8'
}

group 'com.microsoft'
version = '1.8.0'
archivesBaseName = 'durabletask-azure-blob-payloads'

def grpcVersion = '1.78.0'
def azureCoreVersion = '1.57.1'
def azureStorageBlobVersion = '12.29.1'

// Java 11 is used to compile and run all tests. Set the JDK_11 env var to your
// local JDK 11 home directory, e.g. C:/Program Files/Java/openjdk-11.0.12_7/
// If unset, falls back to the current JDK running Gradle.
def rawJdkPath = System.env.JDK_11 ?: System.getProperty("java.home")
def PATH_TO_TEST_JAVA_RUNTIME = rawJdkPath
if (rawJdkPath != null) {
def f = new File(rawJdkPath)
if (f.isFile()) {
PATH_TO_TEST_JAVA_RUNTIME = f.parentFile.parentFile.absolutePath
}
}
def isWindows = System.getProperty("os.name").toLowerCase().contains("win")
def exeSuffix = isWindows ? ".exe" : ""

dependencies {
api project(':client')

// Azure Storage Blobs
implementation "com.azure:azure-storage-blob:${azureStorageBlobVersion}"

// TokenCredential abstraction (from azure-core)
implementation "com.azure:azure-core:${azureCoreVersion}"

// gRPC interceptor API
implementation "io.grpc:grpc-api:${grpcVersion}"
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"

// NOTE: azure-identity is NOT included here. Users who need
// DefaultAzureCredential should add it to their own project.

testImplementation 'org.mockito:mockito-core:5.21.0'
testImplementation 'org.mockito:mockito-junit-jupiter:5.21.0'
testImplementation project(':azuremanaged')
}

compileJava {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
compileTestJava {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
options.fork = true
options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac${exeSuffix}"
}

tasks.withType(Test) {
executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", "bin/java${exeSuffix}")
}

test {
useJUnitPlatform {
// Skip tests tagged as "integration" since those require
// external dependencies (DTS emulator + Azurite).
excludeTags "integration"
}
}

// Integration tests require DTS emulator (default localhost:8080) and Azurite on localhost:10000.
task integrationTest(type: Test) {
useJUnitPlatform {
includeTags 'integration'
}
dependsOn build
shouldRunAfter test
testLogging.showStandardStreams = true
ignoreFailures = false
}

spotbugs {
toolVersion = '4.9.8'
effort = com.github.spotbugs.snom.Effort.valueOf('MAX')
reportLevel = com.github.spotbugs.snom.Confidence.valueOf('HIGH')
ignoreFailures = true
excludeFilter = file('spotbugs-exclude.xml')
}

spotbugsMain {
reports {
html {
required = true
stylesheet = 'fancy-hist.xsl'
}
xml {
required = true
}
}
}

spotbugsTest {
reports {
html {
required = true
stylesheet = 'fancy-hist.xsl'
}
xml {
required = true
}
}
}
17 changes: 17 additions & 0 deletions azure-blob-payloads/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
<!-- Exclude test classes -->
<Match>
<Class name="~.*Test"/>
</Match>

<!-- Exclude common false positives -->
<Match>
<BugPattern name="DM_CONVERT_CASE"/>
</Match>

<!-- Exclude serialization related warnings -->
<Match>
<BugPattern name="SE_NO_SERIALVERSIONID"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask.azureblobpayloads;

import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.BlobDownloadResponse;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* Azure Blob Storage implementation of {@link PayloadStore}.
* <p>
* Stores payloads as blobs and returns opaque tokens in the form {@code blob:v1:<container>:<blobName>}.
* Supports optional gzip compression. The blob container is created automatically on first upload.
*/
public final class BlobPayloadStore extends PayloadStore {

static final String TOKEN_PREFIX = "blob:v1:";
private static final String CONTENT_ENCODING_GZIP = "gzip";

private final BlobContainerClient containerClient;
private final LargePayloadStorageOptions options;

/**
* Creates a new {@code BlobPayloadStore} from the given options.
*
* @param options the storage options
* @throws IllegalArgumentException if neither connection string nor account URI/credential are provided
*/
public BlobPayloadStore(LargePayloadStorageOptions options) {
if (options == null) {
throw new IllegalArgumentException("options must not be null.");
}

String containerName = options.getContainerName();
if (containerName == null || containerName.isEmpty()) {
throw new IllegalArgumentException("Container name must not be null or empty.");
}

boolean hasConnectionString = options.getConnectionString() != null
&& !options.getConnectionString().isEmpty();
boolean hasIdentityAuth = options.getAccountUri() != null && options.getCredential() != null;

if (!hasConnectionString && !hasIdentityAuth) {
throw new IllegalArgumentException(
"Either ConnectionString or AccountUri and Credential must be provided.");
}

// Retry policy: exponential (8 retries, 250ms base, 10s max, 2min network timeout)
// Matches the .NET BlobPayloadStore retry configuration.
RequestRetryOptions retryOptions = new RequestRetryOptions(
RetryPolicyType.EXPONENTIAL,
8, // maxTries
120, // tryTimeoutInSeconds (2 min network timeout)
250L, // retryDelayInMs (250ms base)
10_000L, // maxRetryDelayInMs (10s max)
null); // secondaryHost

BlobServiceClient serviceClient;
if (hasIdentityAuth) {
serviceClient = new BlobServiceClientBuilder()
.endpoint(options.getAccountUri().toString())
.credential(options.getCredential())
.retryOptions(retryOptions)
.buildClient();
} else {
serviceClient = new BlobServiceClientBuilder()
.connectionString(options.getConnectionString())
.retryOptions(retryOptions)
.buildClient();
}

this.containerClient = serviceClient.getBlobContainerClient(containerName);
this.options = options;
}

/**
* Package-private constructor for testing with an injected {@link BlobContainerClient}.
*/
BlobPayloadStore(BlobContainerClient containerClient, LargePayloadStorageOptions options) {
this.containerClient = containerClient;
this.options = options;
}

@Override
public String upload(String payload) {
String blobName = UUID.randomUUID().toString().replace("-", "");
BlobClient blob = this.containerClient.getBlobClient(blobName);

byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);

// Ensure container exists (idempotent)
try {
this.containerClient.createIfNotExists();
} catch (BlobStorageException e) {
// 409 Conflict means it already exists — safe to ignore
if (e.getStatusCode() != 409) {
throw new PayloadStorageException(
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
}
}
Comment thread
bachuv marked this conversation as resolved.
Comment thread
bachuv marked this conversation as resolved.
Comment thread
bachuv marked this conversation as resolved.
Comment thread
bachuv marked this conversation as resolved.

try {
if (this.options.isCompressionEnabled()) {
ByteArrayOutputStream compressedBuffer = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(compressedBuffer)) {
gzip.write(payloadBytes);
}
byte[] compressedBytes = compressedBuffer.toByteArray();
BlobHttpHeaders headers = new BlobHttpHeaders().setContentEncoding(CONTENT_ENCODING_GZIP);
try (InputStream stream = new ByteArrayInputStream(compressedBytes)) {
blob.uploadWithResponse(
stream,
compressedBytes.length,
null, // parallelTransferOptions
headers,
null, // metadata
null, // tier
null, // requestConditions
null, // timeout
Context.NONE);
}
} else {
try (InputStream stream = new ByteArrayInputStream(payloadBytes)) {
blob.upload(stream, payloadBytes.length, true);
}
}
} catch (IOException e) {
throw new PayloadStorageException("Failed to upload payload blob '" + blobName + "'.", e);
}

return encodeToken(this.containerClient.getBlobContainerName(), blobName);
}

@Override
public String download(String token) {
String[] decoded = decodeToken(token);
String container = decoded[0];
String name = decoded[1];

if (!container.equals(this.containerClient.getBlobContainerName())) {
throw new IllegalArgumentException("Token container does not match configured container.");
}

BlobClient blob = this.containerClient.getBlobClient(name);

try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
// Use downloadStreamWithResponse to get content-encoding header in the same call,
// avoiding a separate getProperties() round-trip.
BlobDownloadResponse downloadResponse = blob.downloadStreamWithResponse(
outputStream,
null, // range (full blob)
null, // options
null, // requestConditions
false, // getMD5
null, // timeout
Context.NONE);
byte[] rawBytes = outputStream.toByteArray();

// Check if the content is gzip-compressed via the response header
String contentEncoding = downloadResponse.getDeserializedHeaders().getContentEncoding();
boolean isGzip = CONTENT_ENCODING_GZIP.equalsIgnoreCase(contentEncoding);

if (isGzip) {
try (GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(rawBytes));
ByteArrayOutputStream decompressedBuffer = new ByteArrayOutputStream()) {
byte[] buffer = new byte[8192];
int len;
while ((len = gzip.read(buffer)) != -1) {
decompressedBuffer.write(buffer, 0, len);
}
return decompressedBuffer.toString(StandardCharsets.UTF_8.name());
}
}

return new String(rawBytes, StandardCharsets.UTF_8);
} catch (BlobStorageException e) {
if (e.getStatusCode() == 404) {
throw new PayloadStorageException(
"The blob '" + name + "' was not found in container '" + container + "'. " +
"The payload may have been deleted or the container was never created.", e);
}
throw new PayloadStorageException("Failed to download payload blob '" + name + "'.", e);
} catch (IOException e) {
throw new PayloadStorageException("Failed to decompress payload blob '" + name + "'.", e);
}
}

@Override
public boolean isKnownPayloadToken(String value) {
if (value == null || value.isEmpty()) {
return false;
}
return value.startsWith(TOKEN_PREFIX);
}

static String encodeToken(String container, String name) {
return TOKEN_PREFIX + container + ":" + name;
}

static String[] decodeToken(String token) {
if (!token.startsWith(TOKEN_PREFIX)) {
throw new IllegalArgumentException("Invalid external payload token.");
}
String rest = token.substring(TOKEN_PREFIX.length());
int sep = rest.indexOf(':');
if (sep <= 0 || sep >= rest.length() - 1) {
throw new IllegalArgumentException("Invalid external payload token format.");
}
return new String[] { rest.substring(0, sep), rest.substring(sep + 1) };
}
}
Loading
Loading