Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ class BeamModulePlugin implements Plugin<Project> {
def gax_version = "2.79.0"
def google_ads_version = "33.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "2.2.26"
def google_cloud_bigdataoss_version = "3.1.16"
def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
Expand Down Expand Up @@ -702,9 +702,9 @@ class BeamModulePlugin implements Plugin<Project> {
aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version",
azure_sdk_bom : "com.azure:azure-sdk-bom:1.2.14",
bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version",
bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:$google_cloud_bigdataoss_version",
bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version",
bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:$google_cloud_bigdataoss_version",
byte_buddy : "net.bytebuddy:byte-buddy:1.17.7",
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
Expand All @@ -53,7 +52,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
Expand Down Expand Up @@ -267,8 +265,12 @@ public boolean shouldRetry(IOException e) {
.setReadChannelOptions(gcsReadOptions)
.setGrpcEnabled(shouldUseGrpc)
.build();
googleCloudStorage =
createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
try {
googleCloudStorage =
createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
} catch (IOException e) {
throw new RuntimeException(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Wrapping an IOException in a generic RuntimeException without a descriptive message makes debugging harder. It is recommended to provide context about the failure, similar to the error message used in the previous reflection-based implementation.

Suggested change
throw new RuntimeException(e);
throw new RuntimeException("Failed to create GoogleCloudStorage", e);

}
this.batchRequestSupplier =
() -> {
// Capture reference to this so that the most recent storageClient and initializer
Expand Down Expand Up @@ -725,48 +727,16 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO
}

GoogleCloudStorage createGoogleCloudStorage(
GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
try {
return new GoogleCloudStorageImpl(options, storage, credentials);
} catch (NoSuchMethodError e) {
// gcs-connector 3.x drops the direct constructor and exclusively uses Builder
// TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x
try {
final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder");
Object builder = builderMethod.invoke(null);
final Class<?> builderClass =
Class.forName(
"com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder");

final Method setOptionsMethod =
builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class);
setOptionsMethod.setAccessible(true);
builder = setOptionsMethod.invoke(builder, options);

final Method setHttpTransportMethod =
builderClass.getMethod("setHttpTransport", HttpTransport.class);
setHttpTransportMethod.setAccessible(true);
builder =
setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport());

final Method setCredentialsMethod =
builderClass.getMethod("setCredentials", Credentials.class);
setCredentialsMethod.setAccessible(true);
builder = setCredentialsMethod.invoke(builder, credentials);

final Method setHttpRequestInitializerMethod =
builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class);
setHttpRequestInitializerMethod.setAccessible(true);
builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer);

final Method buildMethod = builderClass.getMethod("build");
buildMethod.setAccessible(true);
return (GoogleCloudStorage) buildMethod.invoke(builder);
} catch (Exception reflectionError) {
throw new RuntimeException(
"Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError);
}
}
GoogleCloudStorageOptions options, Storage storage, Credentials credentials)
throws IOException {
return GoogleCloudStorageImpl.builder()
.setOptions(options)
.setHttpTransport(storage.getRequestFactory().getTransport())
.setCredentials(credentials)
// gcsio 3 expects httpRequestInitializer to be either absent or
// com.google.cloud.hadoop.util.RetryHttpInitializer when credentials not provided
.setHttpRequestInitializer(credentials != null ? httpRequestInitializer : null)
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,19 @@ public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Excep
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO)
.setSupportGzipEncoding(true)
.setFastFailOnNotFound(false)
.setGzipEncodingSupportEnabled(true)
.setFastFailOnNotFoundEnabled(false)
.build();

GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGoogleCloudStorageReadOptions(readOptions);

GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
Mockito.when(
googleCloudStorageMock.open(
Mockito.any(StorageResourceId.class),
Mockito.any(GoogleCloudStorageReadOptions.class)))
.thenReturn(Mockito.mock(SeekableByteChannel.class));
gcsUtil.delegate.setCloudStorageImpl(googleCloudStorageMock);

Expand Down Expand Up @@ -1006,7 +1009,7 @@ public void testGCSChannelCloseIdempotent() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build();
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(false).build();

gcsUtil.delegate.setCloudStorageImpl(
GoogleCloudStorageOptions.builder()
Expand All @@ -1026,7 +1029,7 @@ public void testGCSReadMetricsIsSet() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(true).build();
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(true).build();
gcsUtil.delegate.setCloudStorageImpl(
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
Expand Down Expand Up @@ -1673,8 +1676,10 @@ public static GcsUtilV1Mock createMockWithMockStorage(
.thenReturn(Channels.newChannel(new ByteArrayOutputStream()));
} else {
SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readPayload);
Mockito.when(googleCloudStorageMock.open(Mockito.any())).thenReturn(seekableByteChannel);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
Mockito.when(googleCloudStorageMock.open(Mockito.any(StorageResourceId.class)))
.thenReturn(seekableByteChannel);
Mockito.when(
googleCloudStorageMock.open(Mockito.any(StorageResourceId.class), Mockito.any()))
.thenReturn(seekableByteChannel);
}
return gcsUtilMock;
Expand Down
Loading