From 21e66a3557e87760bcd291cdcc5511cd3799457e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 09:59:21 +0000 Subject: [PATCH 01/24] add AsyncAbstractObjectStream this will be the parent class for AsyncReadObjectStream and AsyncWriteObjectStream --- .../asyncio/async_abstract_object_stream.py | 36 +++++++++++ .../test_async_abstract_object_stream.py | 64 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py create mode 100644 tests/unit/asyncio/test_async_abstract_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py new file mode 100644 index 000000000..02e72ffae --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -0,0 +1,36 @@ +import abc + + +class AsyncAbstractObjectStream(abc.ABC): + """ + Abstract class for both ReadObjectStream as well as WriteObjectStream. + + Attributes will include + 1. bucket_name + 2. object_name + 3. generation_number (if given) + + + """ + + def __init__(self, bucket_name, object_name, generation_number=None): + super().__init__() + self.bucket_name = bucket_name + self.object_name = object_name + self.generation_number = generation_number + + @abc.abstractmethod + async def open(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def close(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def send(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def recv(self): + raise NotImplementedError("Subclasses should implement this method.") diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py new file mode 100644 index 000000000..9679d729e --- /dev/null +++ b/tests/unit/asyncio/test_async_abstract_object_stream.py @@ -0,0 +1,64 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + AsyncAbstractObjectStream, +) + + +# A concrete implementation for testing purposes. +class _ConcreteStream(AsyncAbstractObjectStream): + async def open(self): + pass + + async def close(self): + pass + + async def send(self): + pass + + async def recv(self): + pass + + +def test_init(): + """Test the constructor of AsyncAbstractObjectStream.""" + bucket_name = "test-bucket" + object_name = "test-object" + generation = 12345 + + # Test with all parameters + stream = _ConcreteStream(bucket_name, object_name, generation_number=generation) + assert stream.bucket_name == bucket_name + assert stream.object_name == object_name + assert stream.generation_number == generation + + # Test with default generation_number + stream_no_gen = _ConcreteStream(bucket_name, object_name) + assert stream_no_gen.bucket_name == bucket_name + assert stream_no_gen.object_name == object_name + assert stream_no_gen.generation_number is None + + +def test_instantiation_fails_without_implementation(): + """Test that instantiating an incomplete subclass raises TypeError.""" + + class _IncompleteStream(AsyncAbstractObjectStream): + # Missing implementations for abstract methods like open(), close(), etc. + pass + + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + _IncompleteStream("bucket", "object") From 39503f49553daa611cfe0a3425fb487b502dca29 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 10:22:06 +0000 Subject: [PATCH 02/24] keep _AsyncAbstractObjectStream private --- .../_experimental/asyncio/async_abstract_object_stream.py | 2 +- tests/unit/asyncio/test_async_abstract_object_stream.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 02e72ffae..86de7f715 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -1,7 +1,7 @@ import abc -class AsyncAbstractObjectStream(abc.ABC): +class _AsyncAbstractObjectStream(abc.ABC): """ Abstract class for both ReadObjectStream as well as WriteObjectStream. diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py index 9679d729e..e0dc130ea 100644 --- a/tests/unit/asyncio/test_async_abstract_object_stream.py +++ b/tests/unit/asyncio/test_async_abstract_object_stream.py @@ -15,12 +15,12 @@ import pytest from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( - AsyncAbstractObjectStream, + _AsyncAbstractObjectStream, ) # A concrete implementation for testing purposes. -class _ConcreteStream(AsyncAbstractObjectStream): +class _ConcreteStream(_AsyncAbstractObjectStream): async def open(self): pass @@ -56,7 +56,7 @@ def test_init(): def test_instantiation_fails_without_implementation(): """Test that instantiating an incomplete subclass raises TypeError.""" - class _IncompleteStream(AsyncAbstractObjectStream): + class _IncompleteStream(_AsyncAbstractObjectStream): # Missing implementations for abstract methods like open(), close(), etc. pass From a161fd0449375866c71f7db168def60625c9d748 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 10:46:07 +0000 Subject: [PATCH 03/24] Add _AsyncReadObjectStream and it's stubs --- .../asyncio/async_read_object_stream.py | 80 ++++++ .../_experimental/asyncio/bidi_async.py | 230 ++++++++++++++++++ .../_experimental/asyncio/bidi_base.py | 80 ++++++ .../asyncio/test_async_read_object_stream.py | 77 ++++++ 4 files changed, 467 insertions(+) create mode 100644 google/cloud/storage/_experimental/asyncio/async_read_object_stream.py create mode 100644 google/cloud/storage/_experimental/asyncio/bidi_async.py create mode 100644 google/cloud/storage/_experimental/asyncio/bidi_base.py create mode 100644 tests/unit/asyncio/test_async_read_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py new file mode 100644 index 000000000..078434073 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NOTE: +This is _experimental module for upcoming support for Rapid Storage. +(https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) + +APIs may not work as intented and are not stable yet. Feature is not +GA(Generally Available) yet, please contact your TAM(Technical Account Manager) +if you want to use these APIs. + +""" + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) + + +class _AsyncReadObjectStream(_AsyncAbstractObjectStream): + """Provides an asynchronous, streaming interface for reading from a GCS object. + + This class provides a unix socket-like interface to a GCS Object, with + methods like ``open``, ``close``, ``send``, and ``recv``. + + :type client: :class:`~google.cloud.storage.aio.Client` + :param client: The asynchronous client to use for making API requests. + + :type bucket_name: str + :param bucket_name: The name of the bucket containing the object. + + :type object_name: str + :param object_name: The name of the object to be read. + + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. + + :type read_handle: object + :param read_handle: (Optional) An existing handle for reading the object. + If provided, opening the bidi-gRPC connection will be faster. + """ + + def __init__( + self, + client, + bucket_name=None, + object_name=None, + generation_number=None, + read_handle=None, + ): + super().__init__( + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation_number, + ) + self.client = client + self.read_handle = read_handle + + async def open(self) -> None: + pass + + async def close(self): + pass + + async def send(self, bidi_read_object_request): + pass + + async def recv(self): + pass diff --git a/google/cloud/storage/_experimental/asyncio/bidi_async.py b/google/cloud/storage/_experimental/asyncio/bidi_async.py new file mode 100644 index 000000000..8c5e58fd0 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/bidi_async.py @@ -0,0 +1,230 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Asynchronous bi-directional streaming RPC helpers.""" + +import asyncio +import logging + +from google.api_core import exceptions +from google.cloud.storage._experimental.asyncio.bidi_base import BidiRpcBase + +_LOGGER = logging.getLogger(__name__) + + +class _AsyncRequestQueueGenerator: + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous + requests to a gRPC stream from a Queue. + + This generator takes asynchronous requests off a given queue and yields them + to gRPC. + + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. + + The reason this is necessary + + is because it's let's user have control on the when they would want to + send requests proto messages instead of sending all of them initilally. + + This is achieved via asynchronous queue (asyncio.Queue), + gRPC awaits until there's a message in the queue. + + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order + of messages is not guaranteed. If such a thing is necessary it would be + easy to use a priority queue. + + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. + """ + + def __init__(self, queue: asyncio.Queue, initial_request=None): + self._queue = queue + self._initial_request = initial_request + self.call = None + + def _is_active(self): + """ + Returns true if the call is not set or not completed. + """ + return self.call is None or not self.call.done() + + async def __aiter__(self): + if self._initial_request is not None: + if callable(self._initial_request): + yield self._initial_request() + else: + yield self._initial_request + + while True: + item = await self._queue.get() + + # The consumer explicitly sent "None", indicating that the request + # should end. + if item is None: + _LOGGER.debug("Cleanly exiting request generator.") + return + + if not self._is_active(): + # We have an item, but the call is closed. We should put the + # item back on the queue so that the next call can consume it. + await self._queue.put(item) + _LOGGER.debug( + "Inactive call, replacing item on queue and exiting " + "request generator." + ) + return + + yield item + + +class AsyncBidiRpc(BidiRpcBase): + """A helper for consuming a async bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + Example:: + + initial_request = example_pb2.StreamingRpcRequest( + setting='example') + rpc = AsyncBidiRpc( + stub.StreamingRpc, + initial_request=initial_request, + metadata=[('name', 'value')] + ) + + await rpc.open() + + while rpc.is_active: + print(await rpc.recv()) + await rpc.send(example_pb2.StreamingRpcRequest( + data='example')) + + This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. + + Args: + start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def _create_queue(self): + """Create a queue for requests.""" + return asyncio.Queue() + + async def open(self): + """Opens the stream.""" + if self.is_active: + raise ValueError("Can not open an already open stream.") + + request_generator = _AsyncRequestQueueGenerator( + self._request_queue, initial_request=self._initial_request + ) + try: + call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) + except exceptions.GoogleAPICallError as exc: + # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is + # available from the ``response`` property on the mapped exception. + self._on_call_done(exc.response) + raise + + request_generator.call = call + + # TODO: api_core should expose the future interface for wrapped + # callables as well. + if hasattr(call, "_wrapped"): # pragma: NO COVER + call._wrapped.add_done_callback(self._on_call_done) + else: + call.add_done_callback(self._on_call_done) + + self._request_generator = request_generator + self.call = call + + async def close(self): + """Closes the stream.""" + if self.call is None: + return + + await self._request_queue.put(None) + self.call.cancel() + self._request_generator = None + self._initial_request = None + self._callbacks = [] + # Don't set self.call to None. Keep it around so that send/recv can + # raise the error. + + async def send(self, request): + """Queue a message to be sent on the stream. + + If the underlying RPC has been closed, this will raise. + + Args: + request (protobuf.Message): The request to send. + """ + if self.call is None: + raise ValueError("Can not send() on an RPC that has never been opened.") + + # Don't use self.is_active(), as ResumableBidiRpc will overload it + # to mean something semantically different. + if not self.call.done(): + await self._request_queue.put(request) + else: + # calling read should cause the call to raise. + await self.call.read() + + async def recv(self): + """Wait for a message to be returned from the stream. + + If the underlying RPC has been closed, this will raise. + + Returns: + protobuf.Message: The received message. + """ + if self.call is None: + raise ValueError("Can not recv() on an RPC that has never been opened.") + + return await self.call.read() + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + return self.call is not None and not self.call.done() diff --git a/google/cloud/storage/_experimental/asyncio/bidi_base.py b/google/cloud/storage/_experimental/asyncio/bidi_base.py new file mode 100644 index 000000000..195e35750 --- /dev/null +++ b/google/cloud/storage/_experimental/asyncio/bidi_base.py @@ -0,0 +1,80 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for bi-directional streaming RPC helpers.""" + + +class BidiRpcBase: + """A base class for consuming a bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + This does *not* retry the stream on errors. + + Args: + start_rpc (Union[grpc.StreamStreamMultiCallable, + grpc.aio.StreamStreamMultiCallable]): The gRPC method used + to start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = self._create_queue() + self._request_generator = None + self._callbacks = [] + self.call = None + + def _create_queue(self): + """Create a queue for requests.""" + raise NotImplementedError("`_create_queue` is not implemented.") + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Callable[[grpc.Future], None]): The callback to execute. + It will be provided with the same gRPC future as the underlying + stream which will also be a :class:`grpc.aio.Call`. + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.aio.Call`. + for callback in self._callbacks: + callback(future) + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + raise NotImplementedError("`is_active` is not implemented.") + + @property + def pending_requests(self): + """int: Returns an estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py new file mode 100644 index 000000000..43b42f8d8 --- /dev/null +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -0,0 +1,77 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from unittest import mock + +from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( + _AsyncAbstractObjectStream, +) +from google.cloud.storage._experimental.asyncio.async_read_object_stream import ( + _AsyncReadObjectStream, +) + + +def test_inheritance(): + """Test that _AsyncReadObjectStream inherits from _AsyncAbstractObjectStream.""" + assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) + + +def test_init(): + """Test the constructor of _AsyncReadObjectStream.""" + mock_client = mock.Mock(name="client") + bucket_name = "test-bucket" + object_name = "test-object" + generation = 12345 + read_handle = "some-handle" + + # Test with all parameters + stream = _AsyncReadObjectStream( + mock_client, + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation, + read_handle=read_handle, + ) + + assert stream.client is mock_client + assert stream.bucket_name == bucket_name + assert stream.object_name == object_name + assert stream.generation_number == generation + assert stream.read_handle == read_handle + + # Test with default parameters + stream_defaults = _AsyncReadObjectStream(mock_client) + assert stream_defaults.client is mock_client + assert stream_defaults.bucket_name is None + assert stream_defaults.object_name is None + assert stream_defaults.generation_number is None + assert stream_defaults.read_handle is None + + +@pytest.mark.asyncio +async def test_async_methods_are_awaitable(): + """Test that the async methods exist and are awaitable.""" + mock_client = mock.Mock(name="client") + stream = _AsyncReadObjectStream(mock_client) + + # These methods are currently empty, but we can test they are awaitable + # and don't raise exceptions. + try: + await stream.open() + await stream.close() + await stream.send(mock.Mock()) + await stream.recv() + except Exception as e: + pytest.fail(f"Async methods should be awaitable without errors. Raised: {e}") From dd862a2797a0f68c43c29970ae0c9a28230e995f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 12:36:59 +0000 Subject: [PATCH 04/24] complete __init__ for read_obj_str --- .../asyncio/async_read_object_stream.py | 19 +++++ .../asyncio/test_async_read_object_stream.py | 75 ++++++++++++++++++- 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 078434073..8a6251303 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -25,6 +25,8 @@ from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, ) +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.bidi_async import AsyncBidiRpc class _AsyncReadObjectStream(_AsyncAbstractObjectStream): @@ -67,6 +69,23 @@ def __init__( self.client = client self.read_handle = read_handle + self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" + + # can this interface be changed tmrw ? (not accounting for that) + # self.rpc = self.client.get_bidi_rpc_str_str_mc() # expose this func in GAPIC + self.rpc = self.client._client._transport._wrapped_methods[ + self.client._client._transport.bidi_read_object + ] + first_bidi_read_req = _storage_v2.BidiReadObjectRequest( + read_object_spec=_storage_v2.BidiReadObjectSpec( + bucket=self._full_bucket_name, object=object_name + ), + ) + self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),) + self.socket_like_rpc = AsyncBidiRpc( + self.rpc, initial_request=first_bidi_read_req, metadata=self.metadata + ) + async def open(self) -> None: pass diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 43b42f8d8..3b7c5653e 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -28,9 +28,24 @@ def test_inheritance(): assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) -def test_init(): +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc" +) +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_read_object_stream._storage_v2" +) +def test_init(mock_storage_v2, mock_async_bidi_rpc): """Test the constructor of _AsyncReadObjectStream.""" + # Setup mock client + mock_rpc = mock.Mock(name="rpc") + mock_transport = mock.Mock(name="transport") + mock_transport.bidi_read_object = "bidi_read_object_key" + mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} + mock_gapic_client = mock.Mock(name="gapic_client") + mock_gapic_client._transport = mock_transport mock_client = mock.Mock(name="client") + mock_client._client = mock_gapic_client + bucket_name = "test-bucket" object_name = "test-object" generation = 12345 @@ -51,6 +66,31 @@ def test_init(): assert stream.generation_number == generation assert stream.read_handle == read_handle + full_bucket_name = f"projects/_/buckets/{bucket_name}" + assert stream._full_bucket_name == full_bucket_name + assert stream.rpc is mock_rpc + + mock_storage_v2.BidiReadObjectSpec.assert_called_once_with( + bucket=full_bucket_name, object=object_name + ) + mock_read_object_spec = mock_storage_v2.BidiReadObjectSpec.return_value + mock_storage_v2.BidiReadObjectRequest.assert_called_once_with( + read_object_spec=mock_read_object_spec + ) + mock_initial_request = mock_storage_v2.BidiReadObjectRequest.return_value + + expected_metadata = (("x-goog-request-params", f"bucket={full_bucket_name}"),) + assert stream.metadata == expected_metadata + + mock_async_bidi_rpc.assert_called_once_with( + mock_rpc, initial_request=mock_initial_request, metadata=expected_metadata + ) + assert stream.socket_like_rpc is mock_async_bidi_rpc.return_value + + # Reset mocks for the next test case + mock_storage_v2.reset_mock() + mock_async_bidi_rpc.reset_mock() + # Test with default parameters stream_defaults = _AsyncReadObjectStream(mock_client) assert stream_defaults.client is mock_client @@ -59,11 +99,44 @@ def test_init(): assert stream_defaults.generation_number is None assert stream_defaults.read_handle is None + # The following asserts the behavior with None values. + full_bucket_name_none = "projects/_/buckets/None" + assert stream_defaults._full_bucket_name == full_bucket_name_none + + mock_storage_v2.BidiReadObjectSpec.assert_called_once_with( + bucket=full_bucket_name_none, object=None + ) + mock_read_object_spec_none = mock_storage_v2.BidiReadObjectSpec.return_value + mock_storage_v2.BidiReadObjectRequest.assert_called_once_with( + read_object_spec=mock_read_object_spec_none + ) + mock_initial_request_none = mock_storage_v2.BidiReadObjectRequest.return_value + + expected_metadata_none = ( + ("x-goog-request-params", f"bucket={full_bucket_name_none}"), + ) + assert stream_defaults.metadata == expected_metadata_none + + mock_async_bidi_rpc.assert_called_once_with( + mock_rpc, + initial_request=mock_initial_request_none, + metadata=expected_metadata_none, + ) + @pytest.mark.asyncio async def test_async_methods_are_awaitable(): """Test that the async methods exist and are awaitable.""" + # Setup mock client to allow instantiation of the stream object. + mock_rpc = mock.Mock(name="rpc") + mock_transport = mock.Mock(name="transport") + mock_transport.bidi_read_object = "bidi_read_object_key" + mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} + mock_gapic_client = mock.Mock(name="gapic_client") + mock_gapic_client._transport = mock_transport mock_client = mock.Mock(name="client") + mock_client._client = mock_gapic_client + stream = _AsyncReadObjectStream(mock_client) # These methods are currently empty, but we can test they are awaitable From aaabfd7309a8008dde4ac7bd5c12e5da453c30f7 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 17:04:46 +0000 Subject: [PATCH 05/24] remove unuseful comments --- .../storage/_experimental/asyncio/async_read_object_stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 8a6251303..93b2bc293 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -71,8 +71,6 @@ def __init__( self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" - # can this interface be changed tmrw ? (not accounting for that) - # self.rpc = self.client.get_bidi_rpc_str_str_mc() # expose this func in GAPIC self.rpc = self.client._client._transport._wrapped_methods[ self.client._client._transport.bidi_read_object ] From 23eea966178b88baf684c6aad6808372b255ed25 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Wed, 17 Sep 2025 17:27:08 +0000 Subject: [PATCH 06/24] add methods open close send recv --- .../asyncio/async_read_object_stream.py | 72 ++++++++--- .../asyncio/test_async_read_object_stream.py | 117 +++++++++++++----- 2 files changed, 142 insertions(+), 47 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 93b2bc293..4f9383ced 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -22,12 +22,18 @@ """ +from typing import Optional + from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, ) from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.bidi_async import AsyncBidiRpc +from google.cloud.storage._experimental.asyncio.async_grpc_client import ( + AsyncGrpcClient, +) + class _AsyncReadObjectStream(_AsyncAbstractObjectStream): """Provides an asynchronous, streaming interface for reading from a GCS object. @@ -55,12 +61,12 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): def __init__( self, - client, - bucket_name=None, - object_name=None, - generation_number=None, - read_handle=None, - ): + client: AsyncGrpcClient, + bucket_name: Optional[str] = None, + object_name: Optional[str] = None, + generation_number: Optional[int] = None, + read_handle: Optional[str] = None, + ) -> None: super().__init__( bucket_name=bucket_name, object_name=object_name, @@ -85,13 +91,47 @@ def __init__( ) async def open(self) -> None: - pass - - async def close(self): - pass - - async def send(self, bidi_read_object_request): - pass - - async def recv(self): - pass + """Opens the bidi-gRPC connection to read from the object. + + This method sends an initial request to start the stream and receives + the first response containing metadata and a read handle. + """ + await self.socket_like_rpc.open() # this is actually 1 send + response = await self.socket_like_rpc.recv() + if self.generation_number is None: + self.generation_number = response.metadata.generation + + self.read_handle = response.read_handle + + return + + async def close(self) -> None: + """Closes the bidi-gRPC connection.""" + await self.socket_like_rpc.close() + return + + async def send( + self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest + ) -> None: + """Sends a request message on the stream. + + Args: + bidi_read_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`): + The request message to send. This is typically used to specify + the read offset and limit. + """ + await self.socket_like_rpc.send(bidi_read_object_request) + return + + async def recv(self) -> _storage_v2.BidiReadObjectResponse: + """Receives a response from the stream. + + This method waits for the next message from the server, which could + contain object data or metadata. + + Returns: + :class:`~google.cloud._storage_v2.types.BidiReadObjectResponse`: + The response message from the server. + """ + bidi_read_object_response = await self.socket_like_rpc.recv() + return bidi_read_object_response diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 3b7c5653e..cbd408cdf 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -23,6 +23,20 @@ ) +@pytest.fixture +def mock_client(): + """A mock client for testing.""" + mock_rpc = mock.Mock(name="rpc") + mock_transport = mock.Mock(name="transport") + mock_transport.bidi_read_object = "bidi_read_object_key" + mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} + mock_gapic_client = mock.Mock(name="gapic_client") + mock_gapic_client._transport = mock_transport + mock_client = mock.Mock(name="client") + mock_client._client = mock_gapic_client + return mock_client + + def test_inheritance(): """Test that _AsyncReadObjectStream inherits from _AsyncAbstractObjectStream.""" assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) @@ -34,18 +48,9 @@ def test_inheritance(): @mock.patch( "google.cloud.storage._experimental.asyncio.async_read_object_stream._storage_v2" ) -def test_init(mock_storage_v2, mock_async_bidi_rpc): +def test_init(mock_storage_v2, mock_async_bidi_rpc, mock_client): """Test the constructor of _AsyncReadObjectStream.""" - # Setup mock client - mock_rpc = mock.Mock(name="rpc") - mock_transport = mock.Mock(name="transport") - mock_transport.bidi_read_object = "bidi_read_object_key" - mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} - mock_gapic_client = mock.Mock(name="gapic_client") - mock_gapic_client._transport = mock_transport - mock_client = mock.Mock(name="client") - mock_client._client = mock_gapic_client - + mock_rpc = mock_client._client._transport._wrapped_methods["bidi_read_object_key"] bucket_name = "test-bucket" object_name = "test-object" generation = 12345 @@ -125,26 +130,76 @@ def test_init(mock_storage_v2, mock_async_bidi_rpc): @pytest.mark.asyncio -async def test_async_methods_are_awaitable(): - """Test that the async methods exist and are awaitable.""" - # Setup mock client to allow instantiation of the stream object. - mock_rpc = mock.Mock(name="rpc") - mock_transport = mock.Mock(name="transport") - mock_transport.bidi_read_object = "bidi_read_object_key" - mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} - mock_gapic_client = mock.Mock(name="gapic_client") - mock_gapic_client._transport = mock_transport - mock_client = mock.Mock(name="client") - mock_client._client = mock_gapic_client +async def test_open(mock_client): + """Test open() when generation_number is initially None.""" + stream = _AsyncReadObjectStream(mock_client, bucket_name="b", object_name="o") + stream.socket_like_rpc = mock.AsyncMock() + stream.generation_number = None # Explicitly set for clarity + + mock_response = mock.Mock() + mock_response.metadata.generation = 98765 + mock_response.read_handle = "test-read-handle" + stream.socket_like_rpc.recv.return_value = mock_response + + await stream.open() + + stream.socket_like_rpc.open.assert_awaited_once() + stream.socket_like_rpc.recv.assert_awaited_once() + assert stream.generation_number == 98765 + assert stream.read_handle == "test-read-handle" + + +@pytest.mark.asyncio +async def test_open_with_generation_set(mock_client): + """Test open() when generation_number is already set.""" + initial_generation = 12345 + stream = _AsyncReadObjectStream( + mock_client, + bucket_name="b", + object_name="o", + generation_number=initial_generation, + ) + stream.socket_like_rpc = mock.AsyncMock() + mock_response = mock.Mock() + mock_response.metadata.generation = 98765 + mock_response.read_handle = "test-read-handle" + stream.socket_like_rpc.recv.return_value = mock_response + + await stream.open() + + stream.socket_like_rpc.open.assert_awaited_once() + stream.socket_like_rpc.recv.assert_awaited_once() + assert stream.generation_number == initial_generation # Should not change + assert stream.read_handle == "test-read-handle" + + +@pytest.mark.asyncio +async def test_close(mock_client): + """Test close().""" + stream = _AsyncReadObjectStream(mock_client) + stream.socket_like_rpc = mock.AsyncMock() + await stream.close() + stream.socket_like_rpc.close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_send(mock_client): + """Test send().""" stream = _AsyncReadObjectStream(mock_client) + stream.socket_like_rpc = mock.AsyncMock() + mock_request = mock.Mock() + await stream.send(mock_request) + stream.socket_like_rpc.send.assert_awaited_once_with(mock_request) - # These methods are currently empty, but we can test they are awaitable - # and don't raise exceptions. - try: - await stream.open() - await stream.close() - await stream.send(mock.Mock()) - await stream.recv() - except Exception as e: - pytest.fail(f"Async methods should be awaitable without errors. Raised: {e}") + +@pytest.mark.asyncio +async def test_recv(mock_client): + """Test recv().""" + stream = _AsyncReadObjectStream(mock_client) + stream.socket_like_rpc = mock.AsyncMock() + mock_response = mock.Mock() + stream.socket_like_rpc.recv.return_value = mock_response + response = await stream.recv() + stream.socket_like_rpc.recv.assert_awaited_once() + assert response is mock_response From 71a7a796d7364cf90f6836240a1eada54dd5311d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 04:14:01 +0000 Subject: [PATCH 07/24] change read_handle type from 'str' to 'bytes' --- .../_experimental/asyncio/async_read_object_stream.py | 4 ++-- tests/unit/asyncio/test_async_read_object_stream.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 4f9383ced..497c040b5 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -54,7 +54,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: object + :type read_handle: bytes :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. """ @@ -65,7 +65,7 @@ def __init__( bucket_name: Optional[str] = None, object_name: Optional[str] = None, generation_number: Optional[int] = None, - read_handle: Optional[str] = None, + read_handle: Optional[bytes] = None, ) -> None: super().__init__( bucket_name=bucket_name, diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index cbd408cdf..0792bb731 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -54,7 +54,7 @@ def test_init(mock_storage_v2, mock_async_bidi_rpc, mock_client): bucket_name = "test-bucket" object_name = "test-object" generation = 12345 - read_handle = "some-handle" + read_handle = b"some-handle" # Test with all parameters stream = _AsyncReadObjectStream( @@ -138,7 +138,7 @@ async def test_open(mock_client): mock_response = mock.Mock() mock_response.metadata.generation = 98765 - mock_response.read_handle = "test-read-handle" + mock_response.read_handle = b"test-read-handle" stream.socket_like_rpc.recv.return_value = mock_response await stream.open() @@ -146,7 +146,7 @@ async def test_open(mock_client): stream.socket_like_rpc.open.assert_awaited_once() stream.socket_like_rpc.recv.assert_awaited_once() assert stream.generation_number == 98765 - assert stream.read_handle == "test-read-handle" + assert stream.read_handle == b"test-read-handle" @pytest.mark.asyncio @@ -163,7 +163,7 @@ async def test_open_with_generation_set(mock_client): mock_response = mock.Mock() mock_response.metadata.generation = 98765 - mock_response.read_handle = "test-read-handle" + mock_response.read_handle = b"test-read-handle" stream.socket_like_rpc.recv.return_value = mock_response await stream.open() @@ -171,7 +171,7 @@ async def test_open_with_generation_set(mock_client): stream.socket_like_rpc.open.assert_awaited_once() stream.socket_like_rpc.recv.assert_awaited_once() assert stream.generation_number == initial_generation # Should not change - assert stream.read_handle == "test-read-handle" + assert stream.read_handle == b"test-read-handle" @pytest.mark.asyncio From 0810afc315e29a4f9caa25c08faa7840281de545 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:00:17 +0000 Subject: [PATCH 08/24] fix doc strings, add licence and type hints --- .../asyncio/async_abstract_object_stream.py | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 86de7f715..03e2c5690 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -1,36 +1,61 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import abc +from typing import Any, Optional class _AsyncAbstractObjectStream(abc.ABC): - """ - Abstract class for both ReadObjectStream as well as WriteObjectStream. + """Abstract base class for asynchronous object streams. + + This class defines the common interface for both reading from and writing + to a GCS object in a streaming fashion. - Attributes will include - 1. bucket_name - 2. object_name - 3. generation_number (if given) + :type bucket_name: str + :param bucket_name: (Optional) The name of the bucket containing the object. + :type object_name: str + :param object_name: (Optional) The name of the object. + :type generation_number: int + :param generation_number: (Optional) If present, selects a specific revision of + this object. """ - def __init__(self, bucket_name, object_name, generation_number=None): + def __init__( + self, + bucket_name: Optional[str] = None, + object_name: Optional[str] = None, + generation_number: Optional[int] = None, + ) -> None: super().__init__() - self.bucket_name = bucket_name - self.object_name = object_name - self.generation_number = generation_number + self.bucket_name: Optional[str] = bucket_name + self.object_name: Optional[str] = object_name + self.generation_number: Optional[int] = generation_number @abc.abstractmethod - async def open(self): + async def open(self) -> None: raise NotImplementedError("Subclasses should implement this method.") @abc.abstractmethod - async def close(self): + async def close(self) -> None: raise NotImplementedError("Subclasses should implement this method.") @abc.abstractmethod - async def send(self): + async def send(self, message: Any) -> None: raise NotImplementedError("Subclasses should implement this method.") @abc.abstractmethod - async def recv(self): + async def recv(self) -> Any: raise NotImplementedError("Subclasses should implement this method.") From a14bc6895ad704beb57438b55ede8f6487b50f9e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:16:30 +0000 Subject: [PATCH 09/24] pass abstract methods --- .../_experimental/asyncio/async_abstract_object_stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 03e2c5690..752a058c1 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -46,16 +46,16 @@ def __init__( @abc.abstractmethod async def open(self) -> None: - raise NotImplementedError("Subclasses should implement this method.") + pass @abc.abstractmethod async def close(self) -> None: - raise NotImplementedError("Subclasses should implement this method.") + pass @abc.abstractmethod async def send(self, message: Any) -> None: - raise NotImplementedError("Subclasses should implement this method.") + pass @abc.abstractmethod async def recv(self) -> Any: - raise NotImplementedError("Subclasses should implement this method.") + pass From 635ad07ea962f0bb0df491ac57631d167be29369 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:29:33 +0000 Subject: [PATCH 10/24] add handle param --- .../_experimental/asyncio/async_abstract_object_stream.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 752a058c1..28a92ccb4 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -31,6 +31,10 @@ class _AsyncAbstractObjectStream(abc.ABC): :type generation_number: int :param generation_number: (Optional) If present, selects a specific revision of this object. + + :type handle: bytes + :param handle: (Optional) The handle for the object, could be read_handle or + write_handle, based on how the stream is used. """ def __init__( @@ -38,11 +42,13 @@ def __init__( bucket_name: Optional[str] = None, object_name: Optional[str] = None, generation_number: Optional[int] = None, + handle: Optional[bytes] = None, ) -> None: super().__init__() self.bucket_name: Optional[str] = bucket_name self.object_name: Optional[str] = object_name self.generation_number: Optional[int] = generation_number + self.handle: Optional[bytes] = handle @abc.abstractmethod async def open(self) -> None: From ba453d4617a6c197078e9a3783730d48d5e3b1a3 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 11:36:07 +0000 Subject: [PATCH 11/24] include handle in tests --- tests/unit/asyncio/test_async_abstract_object_stream.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py index e0dc130ea..d81c9ef56 100644 --- a/tests/unit/asyncio/test_async_abstract_object_stream.py +++ b/tests/unit/asyncio/test_async_abstract_object_stream.py @@ -39,18 +39,23 @@ def test_init(): bucket_name = "test-bucket" object_name = "test-object" generation = 12345 + handle = b"test-handle" # Test with all parameters - stream = _ConcreteStream(bucket_name, object_name, generation_number=generation) + stream = _ConcreteStream( + bucket_name, object_name, generation_number=generation, handle=handle + ) assert stream.bucket_name == bucket_name assert stream.object_name == object_name assert stream.generation_number == generation + assert stream.handle == handle # Test with default generation_number stream_no_gen = _ConcreteStream(bucket_name, object_name) assert stream_no_gen.bucket_name == bucket_name assert stream_no_gen.object_name == object_name assert stream_no_gen.generation_number is None + assert stream_no_gen.handle is None def test_instantiation_fails_without_implementation(): From 800c6df51e19123b022b9aa336cda59f939444bf Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 14:59:47 +0000 Subject: [PATCH 12/24] remove unit tests for abstract class --- .../asyncio/async_abstract_object_stream.py | 8 +-- .../test_async_abstract_object_stream.py | 69 ------------------- 2 files changed, 4 insertions(+), 73 deletions(-) delete mode 100644 tests/unit/asyncio/test_async_abstract_object_stream.py diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 28a92ccb4..325089ba5 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -17,10 +17,10 @@ class _AsyncAbstractObjectStream(abc.ABC): - """Abstract base class for asynchronous object streams. + """Abstract base class to represent gRPC stream for GCS ``Object``. - This class defines the common interface for both reading from and writing - to a GCS object in a streaming fashion. + Concrete implementation of this class could be ``_AsyncReadObjectStream`` + or ``_AsyncWriteObjectStream``. :type bucket_name: str :param bucket_name: (Optional) The name of the bucket containing the object. @@ -59,7 +59,7 @@ async def close(self) -> None: pass @abc.abstractmethod - async def send(self, message: Any) -> None: + async def send(self, protobuf: Any) -> None: pass @abc.abstractmethod diff --git a/tests/unit/asyncio/test_async_abstract_object_stream.py b/tests/unit/asyncio/test_async_abstract_object_stream.py deleted file mode 100644 index d81c9ef56..000000000 --- a/tests/unit/asyncio/test_async_abstract_object_stream.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( - _AsyncAbstractObjectStream, -) - - -# A concrete implementation for testing purposes. -class _ConcreteStream(_AsyncAbstractObjectStream): - async def open(self): - pass - - async def close(self): - pass - - async def send(self): - pass - - async def recv(self): - pass - - -def test_init(): - """Test the constructor of AsyncAbstractObjectStream.""" - bucket_name = "test-bucket" - object_name = "test-object" - generation = 12345 - handle = b"test-handle" - - # Test with all parameters - stream = _ConcreteStream( - bucket_name, object_name, generation_number=generation, handle=handle - ) - assert stream.bucket_name == bucket_name - assert stream.object_name == object_name - assert stream.generation_number == generation - assert stream.handle == handle - - # Test with default generation_number - stream_no_gen = _ConcreteStream(bucket_name, object_name) - assert stream_no_gen.bucket_name == bucket_name - assert stream_no_gen.object_name == object_name - assert stream_no_gen.generation_number is None - assert stream_no_gen.handle is None - - -def test_instantiation_fails_without_implementation(): - """Test that instantiating an incomplete subclass raises TypeError.""" - - class _IncompleteStream(_AsyncAbstractObjectStream): - # Missing implementations for abstract methods like open(), close(), etc. - pass - - with pytest.raises(TypeError, match="Can't instantiate abstract class"): - _IncompleteStream("bucket", "object") From 18529adc0d2765721ccaa1836ef5080b775602fd Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 15:23:31 +0000 Subject: [PATCH 13/24] edit doc string for _AsyncReadObjectStream --- .../asyncio/async_read_object_stream.py | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 078434073..a3818d2fa 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -22,59 +22,64 @@ """ +from typing import Any, Optional +from google.cloud import _storage_v2 +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, ) class _AsyncReadObjectStream(_AsyncAbstractObjectStream): - """Provides an asynchronous, streaming interface for reading from a GCS object. + """Class representing a gRPC bidi-stream for reading data from a GCS ``Object``. - This class provides a unix socket-like interface to a GCS Object, with + This class provides a unix socket-like interface to a GCS ``Object``, with methods like ``open``, ``close``, ``send``, and ``recv``. - :type client: :class:`~google.cloud.storage.aio.Client` - :param client: The asynchronous client to use for making API requests. + :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient` + :param client: async grpc client to use for making API requests. :type bucket_name: str - :param bucket_name: The name of the bucket containing the object. + :param bucket_name: The name of the GCS ``bucket`` containing the object. :type object_name: str - :param object_name: The name of the object to be read. + :param object_name: The name of the GCS ``object`` to be read. :type generation_number: int :param generation_number: (Optional) If present, selects a specific revision of this object. - :type read_handle: object + :type read_handle: bytes :param read_handle: (Optional) An existing handle for reading the object. If provided, opening the bidi-gRPC connection will be faster. """ def __init__( self, - client, - bucket_name=None, - object_name=None, - generation_number=None, - read_handle=None, - ): + client: AsyncGrpcClient, + bucket_name: Optional[str] = None, + object_name: Optional[str] = None, + generation_number: Optional[int] = None, + read_handle: Optional[bytes] = None, + ) -> None: super().__init__( bucket_name=bucket_name, object_name=object_name, generation_number=generation_number, ) - self.client = client - self.read_handle = read_handle + self.client: AsyncGrpcClient = client + self.read_handle: Optional[bytes] = read_handle async def open(self) -> None: pass - async def close(self): + async def close(self) -> None: pass - async def send(self, bidi_read_object_request): + async def send( + self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest + ) -> None: pass - async def recv(self): + async def recv(self) -> Any: pass From b4da1acd87c686d396856fa3d83a5d221b39f4f1 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 18:14:52 +0000 Subject: [PATCH 14/24] refactor unit tests for async_read_object_stream --- .../asyncio/async_read_object_stream.py | 6 +- .../asyncio/test_async_read_object_stream.py | 132 +++--------------- 2 files changed, 24 insertions(+), 114 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 47568b005..735dd7f80 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -38,7 +38,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): This class provides a unix socket-like interface to a GCS ``Object``, with methods like ``open``, ``close``, ``send``, and ``recv``. - :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient` + :type client: :class:`~google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client` :param client: async grpc client to use for making API requests. :type bucket_name: str @@ -58,7 +58,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): def __init__( self, - client: AsyncGrpcClient, + client: AsyncGrpcClient.grpc_client, bucket_name: Optional[str] = None, object_name: Optional[str] = None, generation_number: Optional[int] = None, @@ -69,7 +69,7 @@ def __init__( object_name=object_name, generation_number=generation_number, ) - self.client: AsyncGrpcClient = client + self.client: AsyncGrpcClient.grpc_client = client self.read_handle: Optional[bytes] = read_handle self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}" diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 3b7c5653e..b15ea60af 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -14,6 +14,7 @@ import pytest from unittest import mock +from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, @@ -23,128 +24,37 @@ ) -def test_inheritance(): - """Test that _AsyncReadObjectStream inherits from _AsyncAbstractObjectStream.""" - assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) - - @mock.patch( "google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc" ) @mock.patch( - "google.cloud.storage._experimental.asyncio.async_read_object_stream._storage_v2" + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" ) -def test_init(mock_storage_v2, mock_async_bidi_rpc): - """Test the constructor of _AsyncReadObjectStream.""" - # Setup mock client - mock_rpc = mock.Mock(name="rpc") - mock_transport = mock.Mock(name="transport") - mock_transport.bidi_read_object = "bidi_read_object_key" - mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} - mock_gapic_client = mock.Mock(name="gapic_client") - mock_gapic_client._transport = mock_transport - mock_client = mock.Mock(name="client") - mock_client._client = mock_gapic_client - +def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc): + # initialize with bucket, object_name and generation number. & client. bucket_name = "test-bucket" object_name = "test-object" - generation = 12345 - read_handle = "some-handle" - - # Test with all parameters - stream = _AsyncReadObjectStream( - mock_client, + generation_number = 12345 + mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc" + mock_client._client._transport._wrapped_methods = { + "bidi_read_object_rpc": mock.sentinel.A + } + + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, bucket_name=bucket_name, object_name=object_name, - generation_number=generation, - read_handle=read_handle, + generation_number=generation_number, ) - - assert stream.client is mock_client - assert stream.bucket_name == bucket_name - assert stream.object_name == object_name - assert stream.generation_number == generation - assert stream.read_handle == read_handle - full_bucket_name = f"projects/_/buckets/{bucket_name}" - assert stream._full_bucket_name == full_bucket_name - assert stream.rpc is mock_rpc - - mock_storage_v2.BidiReadObjectSpec.assert_called_once_with( - bucket=full_bucket_name, object=object_name + first_bidi_read_req = _storage_v2.BidiReadObjectRequest( + read_object_spec=_storage_v2.BidiReadObjectSpec( + bucket=full_bucket_name, object=object_name + ), ) - mock_read_object_spec = mock_storage_v2.BidiReadObjectSpec.return_value - mock_storage_v2.BidiReadObjectRequest.assert_called_once_with( - read_object_spec=mock_read_object_spec - ) - mock_initial_request = mock_storage_v2.BidiReadObjectRequest.return_value - - expected_metadata = (("x-goog-request-params", f"bucket={full_bucket_name}"),) - assert stream.metadata == expected_metadata - mock_async_bidi_rpc.assert_called_once_with( - mock_rpc, initial_request=mock_initial_request, metadata=expected_metadata + mock.sentinel.A, + initial_request=first_bidi_read_req, + metadata=(("x-goog-request-params", f"bucket={full_bucket_name}"),), ) - assert stream.socket_like_rpc is mock_async_bidi_rpc.return_value - - # Reset mocks for the next test case - mock_storage_v2.reset_mock() - mock_async_bidi_rpc.reset_mock() - - # Test with default parameters - stream_defaults = _AsyncReadObjectStream(mock_client) - assert stream_defaults.client is mock_client - assert stream_defaults.bucket_name is None - assert stream_defaults.object_name is None - assert stream_defaults.generation_number is None - assert stream_defaults.read_handle is None - - # The following asserts the behavior with None values. - full_bucket_name_none = "projects/_/buckets/None" - assert stream_defaults._full_bucket_name == full_bucket_name_none - - mock_storage_v2.BidiReadObjectSpec.assert_called_once_with( - bucket=full_bucket_name_none, object=None - ) - mock_read_object_spec_none = mock_storage_v2.BidiReadObjectSpec.return_value - mock_storage_v2.BidiReadObjectRequest.assert_called_once_with( - read_object_spec=mock_read_object_spec_none - ) - mock_initial_request_none = mock_storage_v2.BidiReadObjectRequest.return_value - - expected_metadata_none = ( - ("x-goog-request-params", f"bucket={full_bucket_name_none}"), - ) - assert stream_defaults.metadata == expected_metadata_none - - mock_async_bidi_rpc.assert_called_once_with( - mock_rpc, - initial_request=mock_initial_request_none, - metadata=expected_metadata_none, - ) - - -@pytest.mark.asyncio -async def test_async_methods_are_awaitable(): - """Test that the async methods exist and are awaitable.""" - # Setup mock client to allow instantiation of the stream object. - mock_rpc = mock.Mock(name="rpc") - mock_transport = mock.Mock(name="transport") - mock_transport.bidi_read_object = "bidi_read_object_key" - mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} - mock_gapic_client = mock.Mock(name="gapic_client") - mock_gapic_client._transport = mock_transport - mock_client = mock.Mock(name="client") - mock_client._client = mock_gapic_client - - stream = _AsyncReadObjectStream(mock_client) - - # These methods are currently empty, but we can test they are awaitable - # and don't raise exceptions. - try: - await stream.open() - await stream.close() - await stream.send(mock.Mock()) - await stream.recv() - except Exception as e: - pytest.fail(f"Async methods should be awaitable without errors. Raised: {e}") + assert read_obj_stream.socket_like_rpc is mock_async_bidi_rpc.return_value From 6dec6c692779e26ddcf30588375f4d0260a7102d Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 18:31:13 +0000 Subject: [PATCH 15/24] bucket_name and object_name cannot be NONE --- .../_experimental/asyncio/async_abstract_object_stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 325089ba5..1ba5aef9b 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -39,14 +39,14 @@ class _AsyncAbstractObjectStream(abc.ABC): def __init__( self, - bucket_name: Optional[str] = None, - object_name: Optional[str] = None, + bucket_name: str, + object_name: str, generation_number: Optional[int] = None, handle: Optional[bytes] = None, ) -> None: super().__init__() - self.bucket_name: Optional[str] = bucket_name - self.object_name: Optional[str] = object_name + self.bucket_name: str = bucket_name + self.object_name: str = object_name self.generation_number: Optional[int] = generation_number self.handle: Optional[bytes] = handle From a15490527a4df40dbc89e631f2b4cee6acfda694 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Thu, 18 Sep 2025 18:35:32 +0000 Subject: [PATCH 16/24] bucket_name and object_name cannot be None --- .../_experimental/asyncio/async_read_object_stream.py | 4 ++-- tests/unit/asyncio/test_async_read_object_stream.py | 10 ++++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index a3818d2fa..96639e0df 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -57,8 +57,8 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): def __init__( self, client: AsyncGrpcClient, - bucket_name: Optional[str] = None, - object_name: Optional[str] = None, + bucket_name: str, + object_name: str, generation_number: Optional[int] = None, read_handle: Optional[bytes] = None, ) -> None: diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 43b42f8d8..a033851cc 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -52,10 +52,12 @@ def test_init(): assert stream.read_handle == read_handle # Test with default parameters - stream_defaults = _AsyncReadObjectStream(mock_client) + stream_defaults = _AsyncReadObjectStream( + mock_client, bucket_name=bucket_name, object_name=object_name + ) assert stream_defaults.client is mock_client - assert stream_defaults.bucket_name is None - assert stream_defaults.object_name is None + assert stream_defaults.bucket_name is bucket_name + assert stream_defaults.object_name is object_name assert stream_defaults.generation_number is None assert stream_defaults.read_handle is None @@ -64,7 +66,7 @@ def test_init(): async def test_async_methods_are_awaitable(): """Test that the async methods exist and are awaitable.""" mock_client = mock.Mock(name="client") - stream = _AsyncReadObjectStream(mock_client) + stream = _AsyncReadObjectStream(mock_client, "bucket", "object") # These methods are currently empty, but we can test they are awaitable # and don't raise exceptions. From 14a2abaff55d46247cd959d0d36259dd5df3278e Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 19 Sep 2025 06:13:41 +0000 Subject: [PATCH 17/24] simplyfy tests for open --- .../asyncio/test_async_read_object_stream.py | 74 +++++++++++-------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index e405c43f6..b98d786c6 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -14,6 +14,7 @@ import pytest from unittest import mock +from unittest.mock import AsyncMock from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( @@ -23,24 +24,10 @@ _AsyncReadObjectStream, ) - -@pytest.fixture -def mock_client(): - """A mock client for testing.""" - mock_rpc = mock.Mock(name="rpc") - mock_transport = mock.Mock(name="transport") - mock_transport.bidi_read_object = "bidi_read_object_key" - mock_transport._wrapped_methods = {"bidi_read_object_key": mock_rpc} - mock_gapic_client = mock.Mock(name="gapic_client") - mock_gapic_client._transport = mock_transport - mock_client = mock.Mock(name="client") - mock_client._client = mock_gapic_client - return mock_client - - -def test_inheritance(): - """Test that _AsyncReadObjectStream inherits from _AsyncAbstractObjectStream.""" - assert issubclass(_AsyncReadObjectStream, _AsyncAbstractObjectStream) +_TEST_BUCKET_NAME = "test-bucket" +_TEST_OBJECT_NAME = "test-object" +_TEST_GENERATION_NUMBER = 12345 +_TEST_READ_HANDLE = b"test-read-handle" @mock.patch( @@ -50,30 +37,55 @@ def test_inheritance(): "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" ) def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc): - # initialize with bucket, object_name and generation number. & client. - bucket_name = "test-bucket" - object_name = "test-object" - generation_number = 12345 + mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc" mock_client._client._transport._wrapped_methods = { "bidi_read_object_rpc": mock.sentinel.A } - - read_obj_stream = _AsyncReadObjectStream( - client=mock_client, - bucket_name=bucket_name, - object_name=object_name, - generation_number=generation_number, - ) - full_bucket_name = f"projects/_/buckets/{bucket_name}" + full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}" first_bidi_read_req = _storage_v2.BidiReadObjectRequest( read_object_spec=_storage_v2.BidiReadObjectSpec( - bucket=full_bucket_name, object=object_name + bucket=full_bucket_name, object=_TEST_OBJECT_NAME ), ) + + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + generation_number=_TEST_GENERATION_NUMBER, + ) + mock_async_bidi_rpc.assert_called_once_with( mock.sentinel.A, initial_request=first_bidi_read_req, metadata=(("x-goog-request-params", f"bucket={full_bucket_name}"),), ) assert read_obj_stream.socket_like_rpc is mock_async_bidi_rpc.return_value + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_open(mock_client): + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + read_obj_stream.socket_like_rpc.open = AsyncMock() + + recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse) + recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object) + recv_response.metadata.generation = _TEST_GENERATION_NUMBER + recv_response.read_handle = _TEST_READ_HANDLE + read_obj_stream.socket_like_rpc.recv = AsyncMock(return_value=recv_response) + + await read_obj_stream.open() + + read_obj_stream.socket_like_rpc.open.assert_called_once() + read_obj_stream.socket_like_rpc.recv.assert_called_once() + + assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER + assert read_obj_stream.read_handle == _TEST_READ_HANDLE From 078afca48bff53bd01c7edca409e8c69557c4c2b Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 19 Sep 2025 06:24:45 +0000 Subject: [PATCH 18/24] simply tests for send recv and close --- .../asyncio/async_read_object_stream.py | 7 +- .../asyncio/test_async_read_object_stream.py | 70 +++++++++++++++++++ 2 files changed, 71 insertions(+), 6 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 01906d999..01272e91b 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -104,12 +104,9 @@ async def open(self) -> None: self.read_handle = response.read_handle - return - async def close(self) -> None: """Closes the bidi-gRPC connection.""" await self.socket_like_rpc.close() - return async def send( self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest @@ -122,7 +119,6 @@ async def send( the read offset and limit. """ await self.socket_like_rpc.send(bidi_read_object_request) - return async def recv(self) -> _storage_v2.BidiReadObjectResponse: """Receives a response from the stream. @@ -134,5 +130,4 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse: :class:`~google.cloud._storage_v2.types.BidiReadObjectResponse`: The response message from the server. """ - bidi_read_object_response = await self.socket_like_rpc.recv() - return bidi_read_object_response + return await self.socket_like_rpc.recv() diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index b98d786c6..aa99821fd 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -69,6 +69,7 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc): ) @pytest.mark.asyncio async def test_open(mock_client): + # arrange read_obj_stream = _AsyncReadObjectStream( client=mock_client, bucket_name=_TEST_BUCKET_NAME, @@ -82,10 +83,79 @@ async def test_open(mock_client): recv_response.read_handle = _TEST_READ_HANDLE read_obj_stream.socket_like_rpc.recv = AsyncMock(return_value=recv_response) + # act await read_obj_stream.open() + # assert read_obj_stream.socket_like_rpc.open.assert_called_once() read_obj_stream.socket_like_rpc.recv.assert_called_once() assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER assert read_obj_stream.read_handle == _TEST_READ_HANDLE + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_close(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + read_obj_stream.socket_like_rpc.close = AsyncMock() + + # act + await read_obj_stream.close() + + # assert + read_obj_stream.socket_like_rpc.close.assert_called_once() + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_send(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + read_obj_stream.socket_like_rpc.send = AsyncMock() + + # act + bidi_read_object_request = _storage_v2.BidiReadObjectRequest() + await read_obj_stream.send(bidi_read_object_request) + + # assert + read_obj_stream.socket_like_rpc.send.assert_called_once_with( + bidi_read_object_request + ) + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_recv(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + bidi_read_object_response = _storage_v2.BidiReadObjectResponse() + read_obj_stream.socket_like_rpc.recv = AsyncMock( + return_value=bidi_read_object_response + ) + + # act + response = await read_obj_stream.recv() + + # assert + read_obj_stream.socket_like_rpc.recv.assert_called_once() + assert response == bidi_read_object_response From 2054989c7b83ba86d2461e046c972d61e1cbbe49 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 19 Sep 2025 06:26:06 +0000 Subject: [PATCH 19/24] minor edit - add bidi-stream in doc string --- .../_experimental/asyncio/async_abstract_object_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py index 1ba5aef9b..49d7a293a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_abstract_object_stream.py @@ -17,7 +17,7 @@ class _AsyncAbstractObjectStream(abc.ABC): - """Abstract base class to represent gRPC stream for GCS ``Object``. + """Abstract base class to represent gRPC bidi-stream for GCS ``Object``. Concrete implementation of this class could be ``_AsyncReadObjectStream`` or ``_AsyncWriteObjectStream``. From 0e60694e0b3f2cdab140b878dcf4d1c686bbe90a Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Fri, 19 Sep 2025 11:05:00 +0000 Subject: [PATCH 20/24] add checks for invalid inputs --- .../asyncio/async_read_object_stream.py | 15 +++++++++++---- .../unit/asyncio/test_async_read_object_stream.py | 7 +++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 96639e0df..bedfbf7ba 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -16,7 +16,7 @@ This is _experimental module for upcoming support for Rapid Storage. (https://cloud.google.com/blog/products/storage-data-transfer/high-performance-storage-innovations-for-ai-hpc#:~:text=your%20AI%20workloads%3A-,Rapid%20Storage,-%3A%20A%20new) -APIs may not work as intented and are not stable yet. Feature is not +APIs may not work as intended and are not stable yet. Feature is not GA(Generally Available) yet, please contact your TAM(Technical Account Manager) if you want to use these APIs. @@ -36,7 +36,7 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): This class provides a unix socket-like interface to a GCS ``Object``, with methods like ``open``, ``close``, ``send``, and ``recv``. - :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient` + :type client: :class:`~google.cloud.storage.asyncio.AsyncGrpcClient.grpc_client` :param client: async grpc client to use for making API requests. :type bucket_name: str @@ -56,18 +56,25 @@ class _AsyncReadObjectStream(_AsyncAbstractObjectStream): def __init__( self, - client: AsyncGrpcClient, + client: AsyncGrpcClient.grpc_client, bucket_name: str, object_name: str, generation_number: Optional[int] = None, read_handle: Optional[bytes] = None, ) -> None: + if client is None: + raise ValueError("client must be provided") + if bucket_name is None: + raise ValueError("bucket_name must be provided") + if object_name is None: + raise ValueError("object_name must be provided") + super().__init__( bucket_name=bucket_name, object_name=object_name, generation_number=generation_number, ) - self.client: AsyncGrpcClient = client + self.client: AsyncGrpcClient.grpc_client = client self.read_handle: Optional[bytes] = read_handle async def open(self) -> None: diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index a033851cc..89d0571b0 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -62,6 +62,13 @@ def test_init(): assert stream_defaults.read_handle is None +def test_init_with_invalid_parameters(): + """Test the constructor of _AsyncReadObjectStream with invalid params.""" + + with pytest.raises(ValueError): + _AsyncReadObjectStream(None, bucket_name=None, object_name=None) + + @pytest.mark.asyncio async def test_async_methods_are_awaitable(): """Test that the async methods exist and are awaitable.""" From dcb6a552507a3e899f19f500e6c790c7fea92e62 Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 23 Sep 2025 05:36:37 +0000 Subject: [PATCH 21/24] remove duplicated import --- .../storage/_experimental/asyncio/async_read_object_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 8561d499f..0ac229c5a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -28,7 +28,6 @@ from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( _AsyncAbstractObjectStream, ) -from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.bidi_async import AsyncBidiRpc From 90d85977456529ed1ac30dd971b20a88b4c5256c Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 23 Sep 2025 05:38:26 +0000 Subject: [PATCH 22/24] remove unused import --- tests/unit/asyncio/test_async_read_object_stream.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index bc56a5d3f..28566084c 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -16,9 +16,6 @@ from unittest import mock from google.cloud import _storage_v2 -from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( - _AsyncAbstractObjectStream, -) from google.cloud.storage._experimental.asyncio.async_read_object_stream import ( _AsyncReadObjectStream, ) From 1473caaefcfd031bdc80639c065700a9f491bde6 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 23 Sep 2025 06:15:00 +0000 Subject: [PATCH 23/24] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- tests/unit/asyncio/test_async_read_object_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 740bb9f3e..d7170c4a4 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -34,7 +34,6 @@ "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" ) def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc): - mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc" mock_client._client._transport._wrapped_methods = { "bidi_read_object_rpc": mock.sentinel.A From fdc3ab9136a01e9f3297d9ff2a8df7aa7db0db9f Mon Sep 17 00:00:00 2001 From: Chandra Sirimala Date: Tue, 23 Sep 2025 06:37:06 +0000 Subject: [PATCH 24/24] remove unused import --- .../storage/_experimental/asyncio/async_read_object_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 373ab003f..316a6750a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -22,7 +22,7 @@ """ -from typing import Any, Optional +from typing import Optional from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (