From 21e66a3557e87760bcd291cdcc5511cd3799457e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 09:59:21 +0000 Subject: [PATCH 01/13] add AsyncAbstractObjectStream this will be the parent class for AsyncReadObjectStream and AsyncWriteObjectStream --- .../asyncio/async_abstract_object_stream.py | 36 +++++++++++ .../test_async_abstract_object_stream.py | 64 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py create mode 100644 tests/unit/asyncio/test_async_abstract_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py new file mode 100644 index 000000000..02e72ffae --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -0,0 +1,36 @@ +import abc + + +class AsyncAbstractObjectStream(abc.ABC): + """ + Abstract class for both ReadObjectStream as well as WriteObjectStream. + + Attributes will include + 1. bucket_name + 2. object_name + 3. generation_number (if given) + + + """ + + def __init__(self, bucket_name, object_name, generation_number=None): + super().__init__() + self.bucket_name = bucket_name + self.object_name = object_name + self.generation_number = generation_number + + @abc.abstractmethod + async def open(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def close(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def send(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def recv(self): + raise NotImplementedError("Subclasses should implement this method.") diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py new file mode 100644 index 000000000..9679d729e --- /dev/null +++ b/tests/unit/asyncio/test_async_abstract_object_stream.py @@ -0,0 +1,64 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + AsyncAbstractObjectStream, +) + + +# A concrete implementation for testing purposes. +class _ConcreteStream(AsyncAbstractObjectStream): + async def open(self): + pass + + async def close(self): + pass + + async def send(self): + pass + + async def recv(self): + pass + + +def test_init(): + """Test the constructor of AsyncAbstractObjectStream.""" + bucket_name = "test-bucket" + object_name = "test-object" + generation = 12345 + + # Test with all parameters + stream = _ConcreteStream(bucket_name, object_name, generation_number=generation) + assert stream.bucket_name == bucket_name + assert stream.object_name == object_name + assert stream.generation_number == generation + + # Test with default generation_number + stream_no_gen = _ConcreteStream(bucket_name, object_name) + assert stream_no_gen.bucket_name == bucket_name + assert stream_no_gen.object_name == object_name + assert stream_no_gen.generation_number is None + + +def test_instantiation_fails_without_implementation(): + """Test that instantiating an incomplete subclass raises TypeError.""" + + class _IncompleteStream(AsyncAbstractObjectStream): + # Missing implementations for abstract methods like open(), close(), etc. + pass + + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + _IncompleteStream("bucket", "object") From 39503f49553daa611cfe0a3425fb487b502dca29 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 10:22:06 +0000 Subject: [PATCH 02/13] keep _AsyncAbstractObjectStream private --- .../_experimental/asyncio/async_abstract_object_stream.py | 2 +- tests/unit/asyncio/test_async_abstract_object_stream.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 02e72ffae..86de7f715 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -1,7 +1,7 @@ import abc -class AsyncAbstractObjectStream(abc.ABC): +class _AsyncAbstractObjectStream(abc.ABC): """ Abstract class for both ReadObjectStream as well as WriteObjectStream. diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py index 9679d729e..e0dc130ea 100644 --- a/tests/unit/asyncio/test_async_abstract_object_stream.py +++ b/tests/unit/asyncio/test_async_abstract_object_stream.py @@ -15,12 +15,12 @@ import pytest from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( - AsyncAbstractObjectStream, + _AsyncAbstractObjectStream, ) # A concrete implementation for testing purposes. -class _ConcreteStream(AsyncAbstractObjectStream): +class _ConcreteStream(_AsyncAbstractObjectStream): async def open(self): pass @@ -56,7 +56,7 @@ def test_init(): def test_instantiation_fails_without_implementation(): """Test that instantiating an incomplete subclass raises TypeError.""" - class _IncompleteStream(AsyncAbstractObjectStream): + class _IncompleteStream(_AsyncAbstractObjectStream): # Missing implementations for abstract methods like open(), close(), etc. pass From a161fd0449375866c71f7db168def60625c9d748 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 10:46:07 +0000 Subject: [PATCH 03/13] Add _AsyncReadObjectStream and it's stubs --- .../asyncio/async_read_object_stream.py | 80 ++++++ .../_experimental/asyncio/bidi_async.py | 230 ++++++++++++++++++ .../_experimental/asyncio/bidi_base.py | 80 ++++++ .../asyncio/test_async_read_object_stream.py | 77 ++++++ 4 files changed, 467 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_read_object_stream.py create mode 100644 google/cloud/storage/_experimental/asyncio/bidi_async.py create mode 100644 google/cloud/storage/_experimental/asyncio/bidi_base.py create mode 100644 tests/unit/asyncio/test_async_read_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py new file mode 100644 index 000000000..078434073 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intented and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM(Technical Account Manager) +if you want to use these APIs. + +""" + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) + + +class _AsyncReadObjectStream(_AsyncAbstractObjectStream): + """Provides an asynchronous, streaming interface for reading from a GCS object. + + This class provides a unix socket-like interface to a GCS Object, with + methods like ``open``, ``close``, ``send``, and ``recv``. + + :type client: :class:`~google.cloud.storage.aio.Client` + :param client: The asynchronous client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the bucket containing the object. + + :type object_name: str + :param object_name: The name of the object to be read. + + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. + + :type read_handle: object + :param read_handle: (Optional) An existing handle for reading the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + + def __init__( + self, + client, + bucket_name=None, + object_name=None, + generation_number=None, + read_handle=None, + ): + super().__init__( + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation_number, + ) + self.client = client + self.read_handle = read_handle + + async def open(self) -> None: + pass + + async def close(self): + pass + + async def send(self, bidi_read_object_request): + pass + + async def recv(self): + pass diff --git a/google/cloud/storage/_experimental/asyncio/bidi_async.py b/google/cloud/storage/_experimental/asyncio/bidi_async.py new file mode 100644 index 000000000..8c5e58fd0 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/bidi_async.py @@ -0,0 +1,230 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Asynchronous bi-directional streaming RPC helpers.""" + +import asyncio +import logging + +from google.api_core import exceptions +from google.cloud.storage._experimental.asyncio.bidi_base import BidiRpcBase + +_LOGGER = logging.getLogger(__name__) + + +class _AsyncRequestQueueGenerator: + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous + requests to a gRPC stream from a Queue. + + This generator takes asynchronous requests off a given queue and yields them + to gRPC. + + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. + + The reason this is necessary + + is because it's let's user have control on the when they would want to + send requests proto messages instead of sending all of them initilally. + + This is achieved via asynchronous queue (asyncio.Queue), + gRPC awaits until there's a message in the queue. + + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order + of messages is not guaranteed. If such a thing is necessary it would be + easy to use a priority queue. + + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. + """ + + def __init__(self, queue: asyncio.Queue, initial_request=None): + self._queue = queue + self._initial_request = initial_request + self.call = None + + def _is_active(self): + """ + Returns true if the call is not set or not completed. + """ + return self.call is None or not self.call.done() + + async def __aiter__(self): + if self._initial_request is not None: + if callable(self._initial_request): + yield self._initial_request() + else: + yield self._initial_request + + while True: + item = await self._queue.get() + + # The consumer explicitly sent "None", indicating that the request + # should end. + if item is None: + _LOGGER.debug("Cleanly exiting request generator.") + return + + if not self._is_active(): + # We have an item, but the call is closed. We should put the + # item back on the queue so that the next call can consume it. + await self._queue.put(item) + _LOGGER.debug( + "Inactive call, replacing item on queue and exiting " + "request generator." + ) + return + + yield item + + +class AsyncBidiRpc(BidiRpcBase): + """A helper for consuming a async bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + Example:: + + initial_request = example_pb2.StreamingRpcRequest( + setting='example') + rpc = AsyncBidiRpc( + stub.StreamingRpc, + initial_request=initial_request, + metadata=[('name', 'value')] + ) + + await rpc.open() + + while rpc.is_active: + print(await rpc.recv()) + await rpc.send(example_pb2.StreamingRpcRequest( + data='example')) + + This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. + + Args: + start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def _create_queue(self): + """Create a queue for requests.""" + return asyncio.Queue() + + async def open(self): + """Opens the stream.""" + if self.is_active: + raise ValueError("Can not open an already open stream.") + + request_generator = _AsyncRequestQueueGenerator( + self._request_queue, initial_request=self._initial_request + ) + try: + call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) + except exceptions.GoogleAPICallError as exc: + # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is + # available from the ``response`` property on the mapped exception. + self._on_call_done(exc.response) + raise + + request_generator.call = call + + # TODO: api_core should expose the future interface for wrapped + # callables as well. + if hasattr(call, "_wrapped"): # pragma: NO COVER + call._wrapped.add_done_callback(self._on_call_done) + else: + call.add_done_callback(self._on_call_done) + + self._request_generator = request_generator + self.call = call + + async def close(self): + """Closes the stream.""" + if self.call is None: + return + + await self._request_queue.put(None) + self.call.cancel() + self._request_generator = None + self._initial_request = None + self._callbacks = [] + # Don't set self.call to None. Keep it around so that send/recv can + # raise the error. + + async def send(self, request): + """Queue a message to be sent on the stream. + + If the underlying RPC has been closed, this will raise. + + Args: + request (protobuf.Message): The request to send. + """ + if self.call is None: + raise ValueError("Can not send() on an RPC that has never been opened.") + + # Don't use self.is_active(), as ResumableBidiRpc will overload it + # to mean something semantically different. + if not self.call.done(): + await self._request_queue.put(request) + else: + # calling read should cause the call to raise. + await self.call.read() + + async def recv(self): + """Wait for a message to be returned from the stream. + + If the underlying RPC has been closed, this will raise. + + Returns: + protobuf.Message: The received message. + """ + if self.call is None: + raise ValueError("Can not recv() on an RPC that has never been opened.") + + return await self.call.read() + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + return self.call is not None and not self.call.done() diff --git a/google/cloud/storage/_experimental/asyncio/bidi_base.py b/google/cloud/storage/_experimental/asyncio/bidi_base.py new file mode 100644 index 000000000..195e35750 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/bidi_base.py @@ -0,0 +1,80 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for bi-directional streaming RPC helpers.""" + + +class BidiRpcBase: + """A base class for consuming a bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + This does *not* retry the stream on errors. + + Args: + start_rpc (Union[grpc.StreamStreamMultiCallable, + grpc.aio.StreamStreamMultiCallable]): The gRPC method used + to start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = self._create_queue() + self._request_generator = None + self._callbacks = [] + self.call = None + + def _create_queue(self): + """Create a queue for requests.""" + raise NotImplementedError("`_create_queue` is not implemented.") + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Callable[[grpc.Future], None]): The callback to execute. + It will be provided with the same gRPC future as the underlying + stream which will also be a :class:`grpc.aio.Call`. + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.aio.Call`. + for callback in self._callbacks: + callback(future) + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + raise NotImplementedError("`is_active` is not implemented.") + + @property + def pending_requests(self): + """int: Returns an estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py new file mode 100644 index 000000000..43b42f8d8 --- /dev/null +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -0,0 +1,77 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) +from google.cloud.storage._experimental.asyncio.async_read_object_stream import ( + _AsyncReadObjectStream, +) + + +def test_inheritance(): + """Test that _AsyncReadObjectStream inherits from _AsyncAbstractObjectStream.""" + assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) + + +def test_init(): + """Test the constructor of _AsyncReadObjectStream.""" + mock_client = mock.Mock(name="client") + bucket_name = "test-bucket" + object_name = "test-object" + generation = 12345 + read_handle = "some-handle" + + # Test with all parameters + stream = _AsyncReadObjectStream( + mock_client, + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation, + read_handle=read_handle, + ) + + assert stream.client is mock_client + assert stream.bucket_name == bucket_name + assert stream.object_name == object_name + assert stream.generation_number == generation + assert stream.read_handle == read_handle + + # Test with default parameters + stream_defaults = _AsyncReadObjectStream(mock_client) + assert stream_defaults.client is mock_client + assert stream_defaults.bucket_name is None + assert stream_defaults.object_name is None + assert stream_defaults.generation_number is None + assert stream_defaults.read_handle is None + + +@pytest.mark.asyncio +async def test_async_methods_are_awaitable(): + """Test that the async methods exist and are awaitable.""" + mock_client = mock.Mock(name="client") + stream = _AsyncReadObjectStream(mock_client) + + # These methods are currently empty, but we can test they are awaitable + # and don't raise exceptions. + try: + await stream.open() + await stream.close() + await stream.send(mock.Mock()) + await stream.recv() + except Exception as e: + pytest.fail(f"Async methods should be awaitable without errors. Raised: {e}") From 0810afc315e29a4f9caa25c08faa7840281de545 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:00:17 +0000 Subject: [PATCH 04/13] fix doc strings, add licence and type hints --- .../asyncio/async_abstract_object_stream.py | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 86de7f715..03e2c5690 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -1,36 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import abc +from typing import Any, Optional class _AsyncAbstractObjectStream(abc.ABC): - """ - Abstract class for both ReadObjectStream as well as WriteObjectStream. + """Abstract base class for asynchronous object streams. + + This class defines the common interface for both reading from and writing + to a GCS object in a streaming fashion. - Attributes will include - 1. bucket_name - 2. object_name - 3. generation_number (if given) + :type bucket_name: str + :param bucket_name: (Optional) The name of the bucket containing the object. + :type object_name: str + :param object_name: (Optional) The name of the object. + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. """ - def __init__(self, bucket_name, object_name, generation_number=None): + def __init__( + self, + bucket_name: Optional[str] = None, + object_name: Optional[str] = None, + generation_number: Optional[int] = None, + ) -> None: super().__init__() - self.bucket_name = bucket_name - self.object_name = object_name - self.generation_number = generation_number + self.bucket_name: Optional[str] = bucket_name + self.object_name: Optional[str] = object_name + self.generation_number: Optional[int] = generation_number @abc.abstractmethod - async def open(self): + async def open(self) -> None: raise NotImplementedError("Subclasses should implement this method.") @abc.abstractmethod - async def close(self): + async def close(self) -> None: raise NotImplementedError("Subclasses should implement this method.") @abc.abstractmethod - async def send(self): + async def send(self, message: Any) -> None: raise NotImplementedError("Subclasses should implement this method.") @abc.abstractmethod - async def recv(self): + async def recv(self) -> Any: raise NotImplementedError("Subclasses should implement this method.") From a14bc6895ad704beb57438b55ede8f6487b50f9e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:16:30 +0000 Subject: [PATCH 05/13] pass abstract methods --- .../_experimental/asyncio/async_abstract_object_stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 03e2c5690..752a058c1 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -46,16 +46,16 @@ def __init__( @abc.abstractmethod async def open(self) -> None: - raise NotImplementedError("Subclasses should implement this method.") + pass @abc.abstractmethod async def close(self) -> None: - raise NotImplementedError("Subclasses should implement this method.") + pass @abc.abstractmethod async def send(self, message: Any) -> None: - raise NotImplementedError("Subclasses should implement this method.") + pass @abc.abstractmethod async def recv(self) -> Any: - raise NotImplementedError("Subclasses should implement this method.") + pass From 635ad07ea962f0bb0df491ac57631d167be29369 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:29:33 +0000 Subject: [PATCH 06/13] add handle param --- .../_experimental/asyncio/async_abstract_object_stream.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 752a058c1..28a92ccb4 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -31,6 +31,10 @@ class _AsyncAbstractObjectStream(abc.ABC): :type generation_number: int :param generation_number: (Optional) If present, selects a specific revision of this object. + + :type handle: bytes + :param handle: (Optional) The handle for the object, could be read_handle or + write_handle, based on how the stream is used. """ def __init__( @@ -38,11 +42,13 @@ def __init__( bucket_name: Optional[str] = None, object_name: Optional[str] = None, generation_number: Optional[int] = None, + handle: Optional[bytes] = None, ) -> None: super().__init__() self.bucket_name: Optional[str] = bucket_name self.object_name: Optional[str] = object_name self.generation_number: Optional[int] = generation_number + self.handle: Optional[bytes] = handle @abc.abstractmethod async def open(self) -> None: From ba453d4617a6c197078e9a3783730d48d5e3b1a3 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:36:07 +0000 Subject: [PATCH 07/13] include handle in tests --- tests/unit/asyncio/test_async_abstract_object_stream.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py index e0dc130ea..d81c9ef56 100644 --- a/tests/unit/asyncio/test_async_abstract_object_stream.py +++ b/tests/unit/asyncio/test_async_abstract_object_stream.py @@ -39,18 +39,23 @@ def test_init(): bucket_name = "test-bucket" object_name = "test-object" generation = 12345 + handle = b"test-handle" # Test with all parameters - stream = _ConcreteStream(bucket_name, object_name, generation_number=generation) + stream = _ConcreteStream( + bucket_name, object_name, generation_number=generation, handle=handle + ) assert stream.bucket_name == bucket_name assert stream.object_name == object_name assert stream.generation_number == generation + assert stream.handle == handle # Test with default generation_number stream_no_gen = _ConcreteStream(bucket_name, object_name) assert stream_no_gen.bucket_name == bucket_name assert stream_no_gen.object_name == object_name assert stream_no_gen.generation_number is None + assert stream_no_gen.handle is None def test_instantiation_fails_without_implementation(): From 800c6df51e19123b022b9aa336cda59f939444bf Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 14:59:47 +0000 Subject: [PATCH 08/13] remove unit tests for abstract class --- .../asyncio/async_abstract_object_stream.py | 8 +-- .../test_async_abstract_object_stream.py | 69 ------------------- 2 files changed, 4 insertions(+), 73 deletions(-) delete mode 100644 tests/unit/asyncio/test_async_abstract_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 28a92ccb4..325089ba5 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -17,10 +17,10 @@ class _AsyncAbstractObjectStream(abc.ABC): - """Abstract base class for asynchronous object streams. + """Abstract base class to represent gRPC stream for GCS ``Object``. - This class defines the common interface for both reading from and writing - to a GCS object in a streaming fashion. + Concrete implementation of this class could be ``_AsyncReadObjectStream`` + or ``_AsyncWriteObjectStream``. :type bucket_name: str :param bucket_name: (Optional) The name of the bucket containing the object. @@ -59,7 +59,7 @@ async def close(self) -> None: pass @abc.abstractmethod - async def send(self, message: Any) -> None: + async def send(self, protobuf: Any) -> None: pass @abc.abstractmethod diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py deleted file mode 100644 index d81c9ef56..000000000 --- a/tests/unit/asyncio/test_async_abstract_object_stream.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( - _AsyncAbstractObjectStream, -) - - -# A concrete implementation for testing purposes. -class _ConcreteStream(_AsyncAbstractObjectStream): - async def open(self): - pass - - async def close(self): - pass - - async def send(self): - pass - - async def recv(self): - pass - - -def test_init(): - """Test the constructor of AsyncAbstractObjectStream.""" - bucket_name = "test-bucket" - object_name = "test-object" - generation = 12345 - handle = b"test-handle" - - # Test with all parameters - stream = _ConcreteStream( - bucket_name, object_name, generation_number=generation, handle=handle - ) - assert stream.bucket_name == bucket_name - assert stream.object_name == object_name - assert stream.generation_number == generation - assert stream.handle == handle - - # Test with default generation_number - stream_no_gen = _ConcreteStream(bucket_name, object_name) - assert stream_no_gen.bucket_name == bucket_name - assert stream_no_gen.object_name == object_name - assert stream_no_gen.generation_number is None - assert stream_no_gen.handle is None - - -def test_instantiation_fails_without_implementation(): - """Test that instantiating an incomplete subclass raises TypeError.""" - - class _IncompleteStream(_AsyncAbstractObjectStream): - # Missing implementations for abstract methods like open(), close(), etc. - pass - - with pytest.raises(TypeError, match="Can't instantiate abstract class"): - _IncompleteStream("bucket", "object") From 18529adc0d2765721ccaa1836ef5080b775602fd Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 15:23:31 +0000 Subject: [PATCH 09/13] edit doc string for _AsyncReadObjectStream --- .../asyncio/async_read_object_stream.py | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 078434073..a3818d2fa 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -22,59 +22,64 @@ """ +from typing import Any, Optional +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, ) class _AsyncReadObjectStream(_AsyncAbstractObjectStream): - """Provides an asynchronous, streaming interface for reading from a GCS object. + """Class representing a gRPC bidi-stream for reading data from a GCS ``Object``. - This class provides a unix socket-like interface to a GCS Object, with + This class provides a unix socket-like interface to a GCS ``Object``, with methods like ``open``, ``close``, ``send``, and ``recv``. - :type client: :class:`~google.cloud.storage.aio.Client` - :param client: The asynchronous client to use for making API requests. + :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient` + :param client: async grpc client to use for making API requests. :type bucket_name: str - :param bucket_name: The name of the bucket containing the object. + :param bucket_name: The name of the GCS ``bucket`` containing the object. :type object_name: str - :param object_name: The name of the object to be read. + :param object_name: The name of the GCS ``object`` to be read. :type generation_number: int :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: object + :type read_handle: bytes :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. """ def __init__( self, - client, - bucket_name=None, - object_name=None, - generation_number=None, - read_handle=None, - ): + client: AsyncGrpcClient, + bucket_name: Optional[str] = None, + object_name: Optional[str] = None, + generation_number: Optional[int] = None, + read_handle: Optional[bytes] = None, + ) -> None: super().__init__( bucket_name=bucket_name, object_name=object_name, generation_number=generation_number, ) - self.client = client - self.read_handle = read_handle + self.client: AsyncGrpcClient = client + self.read_handle: Optional[bytes] = read_handle async def open(self) -> None: pass - async def close(self): + async def close(self) -> None: pass - async def send(self, bidi_read_object_request): + async def send( + self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest + ) -> None: pass - async def recv(self): + async def recv(self) -> Any: pass From 6dec6c692779e26ddcf30588375f4d0260a7102d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 18:31:13 +0000 Subject: [PATCH 10/13] bucket_name and object_name cannot be NONE --- .../_experimental/asyncio/async_abstract_object_stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 325089ba5..1ba5aef9b 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -39,14 +39,14 @@ class _AsyncAbstractObjectStream(abc.ABC): def __init__( self, - bucket_name: Optional[str] = None, - object_name: Optional[str] = None, + bucket_name: str, + object_name: str, generation_number: Optional[int] = None, handle: Optional[bytes] = None, ) -> None: super().__init__() - self.bucket_name: Optional[str] = bucket_name - self.object_name: Optional[str] = object_name + self.bucket_name: str = bucket_name + self.object_name: str = object_name self.generation_number: Optional[int] = generation_number self.handle: Optional[bytes] = handle From a15490527a4df40dbc89e631f2b4cee6acfda694 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 18:35:32 +0000 Subject: [PATCH 11/13] bucket_name and object_name cannot be None --- .../_experimental/asyncio/async_read_object_stream.py | 4 ++-- tests/unit/asyncio/test_async_read_object_stream.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index a3818d2fa..96639e0df 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -57,8 +57,8 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): def __init__( self, client: AsyncGrpcClient, - bucket_name: Optional[str] = None, - object_name: Optional[str] = None, + bucket_name: str, + object_name: str, generation_number: Optional[int] = None, read_handle: Optional[bytes] = None, ) -> None: diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 43b42f8d8..a033851cc 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -52,10 +52,12 @@ def test_init(): assert stream.read_handle == read_handle # Test with default parameters - stream_defaults = _AsyncReadObjectStream(mock_client) + stream_defaults = _AsyncReadObjectStream( + mock_client, bucket_name=bucket_name, object_name=object_name + ) assert stream_defaults.client is mock_client - assert stream_defaults.bucket_name is None - assert stream_defaults.object_name is None + assert stream_defaults.bucket_name is bucket_name + assert stream_defaults.object_name is object_name assert stream_defaults.generation_number is None assert stream_defaults.read_handle is None @@ -64,7 +66,7 @@ def test_init(): async def test_async_methods_are_awaitable(): """Test that the async methods exist and are awaitable.""" mock_client = mock.Mock(name="client") - stream = _AsyncReadObjectStream(mock_client) + stream = _AsyncReadObjectStream(mock_client, "bucket", "object") # These methods are currently empty, but we can test they are awaitable # and don't raise exceptions. From 2054989c7b83ba86d2461e046c972d61e1cbbe49 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 19 Sep 2025 06:26:06 +0000 Subject: [PATCH 12/13] minor edit - add bidi-stream in doc string --- .../_experimental/asyncio/async_abstract_object_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 1ba5aef9b..49d7a293a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -17,7 +17,7 @@ class _AsyncAbstractObjectStream(abc.ABC): - """Abstract base class to represent gRPC stream for GCS ``Object``. + """Abstract base class to represent gRPC bidi-stream for GCS ``Object``. Concrete implementation of this class could be ``_AsyncReadObjectStream`` or ``_AsyncWriteObjectStream``. From 0e60694e0b3f2cdab140b878dcf4d1c686bbe90a Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 19 Sep 2025 11:05:00 +0000 Subject: [PATCH 13/13] add checks for invalid inputs --- .../asyncio/async_read_object_stream.py | 15 +++++++++++---- .../unit/asyncio/test_async_read_object_stream.py | 7 +++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 96639e0df..bedfbf7ba 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -16,7 +16,7 @@ This is _experimental module for upcoming support for Rapid Storage. (https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) -APIs may not work as intented and are not stable yet. Feature is not +APIs may not work as intended and are not stable yet. Feature is not GA(Generally Available) yet, please contact your TAM(Technical Account Manager) if you want to use these APIs. @@ -36,7 +36,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): This class provides a unix socket-like interface to a GCS ``Object``, with methods like ``open``, ``close``, ``send``, and ``recv``. - :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient` + :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient.grpc_client` :param client: async grpc client to use for making API requests. :type bucket_name: str @@ -56,18 +56,25 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): def __init__( self, - client: AsyncGrpcClient, + client: AsyncGrpcClient.grpc_client, bucket_name: str, object_name: str, generation_number: Optional[int] = None, read_handle: Optional[bytes] = None, ) -> None: + if client is None: + raise ValueError("client must be provided") + if bucket_name is None: + raise ValueError("bucket_name must be provided") + if object_name is None: + raise ValueError("object_name must be provided") + super().__init__( bucket_name=bucket_name, object_name=object_name, generation_number=generation_number, ) - self.client: AsyncGrpcClient = client + self.client: AsyncGrpcClient.grpc_client = client self.read_handle: Optional[bytes] = read_handle async def open(self) -> None: diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index a033851cc..89d0571b0 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -62,6 +62,13 @@ def test_init(): assert stream_defaults.read_handle is None +def test_init_with_invalid_parameters(): + """Test the constructor of _AsyncReadObjectStream with invalid params.""" + + with pytest.raises(ValueError): + _AsyncReadObjectStream(None, bucket_name=None, object_name=None) + + @pytest.mark.asyncio async def test_async_methods_are_awaitable(): """Test that the async methods exist and are awaitable."""