diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index e32d1c9afb77..ecd81b201fdc 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -611,7 +611,7 @@ class BeamModulePlugin implements Plugin { def gax_version = "2.79.0" def google_ads_version = "33.0.0" def google_clients_version = "2.0.0" - def google_cloud_bigdataoss_version = "2.2.26" + def google_cloud_bigdataoss_version = "3.1.16" def google_code_gson_version = "2.10.1" def google_oauth_clients_version = "1.34.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom @@ -702,9 +702,9 @@ class BeamModulePlugin implements Plugin { aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version", azure_sdk_bom : "com.azure:azure-sdk-bom:1.2.14", bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version", - bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version", + bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:$google_cloud_bigdataoss_version", bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version", - bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version", + bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:$google_cloud_bigdataoss_version", byte_buddy : "net.bytebuddy:byte-buddy:1.17.7", cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version", cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version", diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java index 1ade4be6fdb5..97778ac4e1df 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java @@ -30,7 +30,6 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpStatusCodes; -import com.google.api.client.http.HttpTransport; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; @@ -53,7 +52,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.reflect.Method; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; @@ -267,8 +265,12 @@ public boolean shouldRetry(IOException e) { .setReadChannelOptions(gcsReadOptions) .setGrpcEnabled(shouldUseGrpc) .build(); - googleCloudStorage = - createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); + try { + googleCloudStorage = + createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); + } catch (IOException e) { + throw new RuntimeException(e); + } this.batchRequestSupplier = () -> { // Capture reference to this so that the most recent storageClient and initializer @@ -725,48 +727,16 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO } GoogleCloudStorage createGoogleCloudStorage( - GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { - try { - return new GoogleCloudStorageImpl(options, storage, credentials); - } catch (NoSuchMethodError e) { - // gcs-connector 3.x drops the direct constructor and exclusively uses Builder - // TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x - try { - final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder"); - Object builder = builderMethod.invoke(null); - final Class builderClass = - Class.forName( - "com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder"); - - final Method setOptionsMethod = - builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class); - setOptionsMethod.setAccessible(true); - builder = setOptionsMethod.invoke(builder, options); - - final Method setHttpTransportMethod = - builderClass.getMethod("setHttpTransport", HttpTransport.class); - setHttpTransportMethod.setAccessible(true); - builder = - setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport()); - - final Method setCredentialsMethod = - builderClass.getMethod("setCredentials", Credentials.class); - setCredentialsMethod.setAccessible(true); - builder = setCredentialsMethod.invoke(builder, credentials); - - final Method setHttpRequestInitializerMethod = - builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class); - setHttpRequestInitializerMethod.setAccessible(true); - builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer); - - final Method buildMethod = builderClass.getMethod("build"); - buildMethod.setAccessible(true); - return (GoogleCloudStorage) buildMethod.invoke(builder); - } catch (Exception reflectionError) { - throw new RuntimeException( - "Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError); - } - } + GoogleCloudStorageOptions options, Storage storage, Credentials credentials) + throws IOException { + return GoogleCloudStorageImpl.builder() + .setOptions(options) + .setHttpTransport(storage.getRequestFactory().getTransport()) + .setCredentials(credentials) + // gcsio 3 expects httpRequestInitializer to be either absent or + // com.google.cloud.hadoop.util.RetryHttpInitializer when credentials not provided + .setHttpRequestInitializer(credentials != null ? httpRequestInitializer : null) + .build(); } /** diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index a2b0e0af502b..d32ca162e3fd 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -184,8 +184,8 @@ public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Excep GoogleCloudStorageReadOptions readOptions = GoogleCloudStorageReadOptions.builder() .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) - .setSupportGzipEncoding(true) - .setFastFailOnNotFound(false) + .setGzipEncodingSupportEnabled(true) + .setFastFailOnNotFoundEnabled(false) .build(); GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); @@ -193,7 +193,10 @@ public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Excep GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); - Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) + Mockito.when( + googleCloudStorageMock.open( + Mockito.any(StorageResourceId.class), + Mockito.any(GoogleCloudStorageReadOptions.class))) .thenReturn(Mockito.mock(SeekableByteChannel.class)); gcsUtil.delegate.setCloudStorageImpl(googleCloudStorageMock); @@ -1006,7 +1009,7 @@ public void testGCSChannelCloseIdempotent() throws IOException { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorageReadOptions readOptions = - GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build(); + GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(false).build(); gcsUtil.delegate.setCloudStorageImpl( GoogleCloudStorageOptions.builder() @@ -1026,7 +1029,7 @@ public void testGCSReadMetricsIsSet() { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorageReadOptions readOptions = - GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(true).build(); + GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(true).build(); gcsUtil.delegate.setCloudStorageImpl( GoogleCloudStorageOptions.builder() .setAppName("Beam") @@ -1673,8 +1676,10 @@ public static GcsUtilV1Mock createMockWithMockStorage( .thenReturn(Channels.newChannel(new ByteArrayOutputStream())); } else { SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readPayload); - Mockito.when(googleCloudStorageMock.open(Mockito.any())).thenReturn(seekableByteChannel); - Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) + Mockito.when(googleCloudStorageMock.open(Mockito.any(StorageResourceId.class))) + .thenReturn(seekableByteChannel); + Mockito.when( + googleCloudStorageMock.open(Mockito.any(StorageResourceId.class), Mockito.any())) .thenReturn(seekableByteChannel); } return gcsUtilMock;