[Do not Merge] POC for ACO#13158
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements a background metadata fetching mechanism to enrich OpenTelemetry spans with bucket-specific resource IDs and locations. It introduces a BucketMetadataCache and decorators for Span and SpanBuilder to handle attribute extraction and application across both gRPC and HTTP transports. Feedback highlights several critical areas for improvement: the use of fragile reflection to access internal gRPC channels, the risks associated with an unbounded thread pool for background fetches, and the omission of certain SpanBuilder method overrides which could bypass the metadata logic. Additionally, it is recommended to improve error handling for background tasks and replace brittle sleep statements in tests with robust polling.
| java.lang.reflect.Field bgField = | ||
| com.google.storage.v2.stub.GrpcStorageStub.class.getDeclaredField("backgroundResources"); | ||
| bgField.setAccessible(true); | ||
| java.lang.Object bgAggregation = bgField.get(stub); | ||
|
|
||
| java.lang.reflect.Field listField = | ||
| bgAggregation.getClass().getDeclaredField("backgroundResources"); | ||
| listField.setAccessible(true); | ||
| java.util.List<?> resourcesList = (java.util.List<?>) listField.get(bgAggregation); | ||
|
|
||
| for (java.lang.Object res : resourcesList) { | ||
| if (res instanceof com.google.api.gax.grpc.GrpcTransportChannel) { | ||
| channel = ((com.google.api.gax.grpc.GrpcTransportChannel) res).getChannel(); | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
The use of reflection to access private fields (backgroundResources) within GrpcStorageStub and its internal aggregation classes is highly fragile. These internal implementation details are not part of the public API and are subject to change in any dependency update (e.g., gax or google-cloud-storage updates), which would lead to NoSuchFieldException or IllegalAccessException at runtime. Since this is a POC, it might be acceptable for now, but a production implementation should find a supported way to access the underlying channel or use the generated client's methods directly.
| Executors.newCachedThreadPool( | ||
| r -> { | ||
| Thread t = new Thread(r); | ||
| t.setDaemon(true); | ||
| t.setName("gcs-aco-metadata-cache-pool"); | ||
| return t; | ||
| }); |
There was a problem hiding this comment.
Using Executors.newCachedThreadPool() creates an unbounded thread pool. This can lead to resource exhaustion (too many threads) if a large number of unique buckets are accessed simultaneously, especially since each fetch involves a blocking gRPC/HTTP call. Per the general rules, a bounded thread pool should be used instead.
private final ExecutorService cacheExecutor =
new ThreadPoolExecutor(
0,
20,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("gcs-aco-metadata-cache-pool");
return t;
});References
- For safety, use a bounded thread pool (e.g., ThreadPoolExecutor with a defined maximum size) instead of an unbounded one (e.g., Executors.newCachedThreadPool()), even if the current logic seems to limit concurrent tasks.
| } catch (Exception e) { | ||
| } |
| public SpanBuilder setAttribute(String key, String value) { | ||
| delegate.setAttribute(key, value); | ||
| if ("gsutil.uri".equals(key) && value != null) { | ||
| String name = OtelStorageDecorator.extractBucketName(value); | ||
| if (name != null && !name.isEmpty()) { | ||
| this.bucketName = name; | ||
| } | ||
| } | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public <T> SpanBuilder setAttribute(AttributeKey<T> key, T value) { | ||
| delegate.setAttribute(key, value); | ||
| if ("gsutil.uri".equals(key.getKey()) && value instanceof String) { | ||
| String name = OtelStorageDecorator.extractBucketName((String) value); | ||
| if (name != null && !name.isEmpty()) { | ||
| this.bucketName = name; | ||
| } | ||
| } | ||
| return this; | ||
| } |
There was a problem hiding this comment.
The AcoSpanBuilder wrapper is missing an implementation for setAllAttributes(Attributes attributes). If a user calls setAllAttributes instead of setAttribute, the bucketName extraction logic will be bypassed, and the background metadata fetch will not be triggered. Ensure all relevant SpanBuilder methods that can set attributes are overridden to check for the bucket URI.
| storage.getOptions().toBuilder().setOpenTelemetry(openTelemetrySdk).build(); | ||
| try (Storage storage = storageOptions.getService()) { | ||
| storage.create(BlobInfo.newBuilder(bucket, generator.randomObjectName()).build()); | ||
| Thread.sleep(800); |
There was a problem hiding this comment.
…flection-based gRPC channel extraction
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a bucket metadata caching mechanism to enrich OpenTelemetry spans with resource attributes, specifically 'gcp.resource.destination.id' and 'gcp.resource.destination.location'. It adds new classes like AcoSpan, AcoSpanBuilder, and BucketMetadataCache, and implements the GetStorageLayout RPC for both gRPC and HTTP transports. Review feedback identifies several areas for improvement: the cache invalidation logic for 404 errors is currently too aggressive and may trigger redundant fetches; the use of an unbounded cached thread pool should be replaced with a bounded one for better resource management; and exceptions during background fetches are being swallowed without logging. Additionally, it is recommended to make the hardcoded cache capacity configurable and to use a more graceful shutdown sequence for the executor service.
| if (exception instanceof StorageException | ||
| && parent != null | ||
| && parent.delegate instanceof StorageInternal) { | ||
| StorageException se = (StorageException) exception; | ||
| if (se.getCode() == 404) { | ||
| ((StorageInternal) parent.delegate).getBucketMetadataCache().remove(bucketName); | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic to remove the bucket from the metadata cache on any 404 error is too aggressive. A 404 status code can be returned for object-level operations (e.g., getObject) when the object is missing but the bucket still exists. This results in unnecessary cache invalidation and redundant background fetches. Consider checking if the error specifically indicates that the bucket itself is not found before invalidating the cache.
| } | ||
|
|
||
| private final ExecutorService cacheExecutor = | ||
| Executors.newCachedThreadPool( |
There was a problem hiding this comment.
Using newCachedThreadPool for background metadata fetches can lead to an unbounded number of threads if many unique buckets are accessed simultaneously. As per safety rules, use a bounded thread pool (e.g., ThreadPoolExecutor with a defined maximum size) to ensure better resource management.
References
- For safety, use a bounded thread pool (e.g., ThreadPoolExecutor with a defined maximum size) instead of an unbounded one (e.g., Executors.newCachedThreadPool()), even if the current logic seems to limit concurrent tasks.
| } catch (Exception e) { | ||
| } |
| if (delegate instanceof StorageInternal) { | ||
| ((StorageInternal) delegate).getBucketMetadataCache().clear(); | ||
| } | ||
| cacheExecutor.shutdownNow(); |
| @SuppressWarnings("unchecked") | ||
| Map<String, Object> map = parser.parse(Map.class); |
| if (bucketMetadataCache == null) { | ||
| synchronized (cacheInitLock) { | ||
| if (bucketMetadataCache == null) { | ||
| bucketMetadataCache = new BucketMetadataCache(10000); |
There was a problem hiding this comment.
The cache capacity of 10000 is a magic number. To improve flexibility and follow repository guidelines, consider making this configurable via StorageOptions rather than using a hardcoded value or a static constant.
References
- Avoid declaring constants as 'static final' if there is a plan to expose or configure them at the instance level in the future.
No description provided.