diff --git a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py index 0ac229c5a..316a6750a 100644 --- a/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py +++ b/google/cloud/storage/_experimental/asyncio/async_read_object_stream.py @@ -22,7 +22,7 @@ """ -from typing import Any, Optional +from typing import Optional from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import ( @@ -94,15 +94,42 @@ def __init__( ) async def open(self) -> None: - pass + """Opens the bidi-gRPC connection to read from the object. + + This method sends an initial request to start the stream and receives + the first response containing metadata and a read handle. + """ + await self.socket_like_rpc.open() # this is actually 1 send + response = await self.socket_like_rpc.recv() + if self.generation_number is None: + self.generation_number = response.metadata.generation + + self.read_handle = response.read_handle async def close(self) -> None: - pass + """Closes the bidi-gRPC connection.""" + await self.socket_like_rpc.close() async def send( self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest ) -> None: - pass - - async def recv(self) -> Any: - pass + """Sends a request message on the stream. + + Args: + bidi_read_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`): + The request message to send. This is typically used to specify + the read offset and limit. + """ + await self.socket_like_rpc.send(bidi_read_object_request) + + async def recv(self) -> _storage_v2.BidiReadObjectResponse: + """Receives a response from the stream. + + This method waits for the next message from the server, which could + contain object data or metadata. + + Returns: + :class:`~google.cloud._storage_v2.types.BidiReadObjectResponse`: + The response message from the server. + """ + return await self.socket_like_rpc.recv() diff --git a/tests/unit/asyncio/test_async_read_object_stream.py b/tests/unit/asyncio/test_async_read_object_stream.py index 28566084c..d7170c4a4 100644 --- a/tests/unit/asyncio/test_async_read_object_stream.py +++ b/tests/unit/asyncio/test_async_read_object_stream.py @@ -14,12 +14,18 @@ import pytest from unittest import mock +from unittest.mock import AsyncMock from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_read_object_stream import ( _AsyncReadObjectStream, ) +_TEST_BUCKET_NAME = "test-bucket" +_TEST_OBJECT_NAME = "test-object" +_TEST_GENERATION_NUMBER = 12345 +_TEST_READ_HANDLE = b"test-read-handle" + @mock.patch( "google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc" @@ -28,27 +34,24 @@ "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" ) def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc): - # initialize with bucket, object_name and generation number. & client. - bucket_name = "test-bucket" - object_name = "test-object" - generation_number = 12345 mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc" mock_client._client._transport._wrapped_methods = { "bidi_read_object_rpc": mock.sentinel.A } - - read_obj_stream = _AsyncReadObjectStream( - client=mock_client, - bucket_name=bucket_name, - object_name=object_name, - generation_number=generation_number, - ) - full_bucket_name = f"projects/_/buckets/{bucket_name}" + full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}" first_bidi_read_req = _storage_v2.BidiReadObjectRequest( read_object_spec=_storage_v2.BidiReadObjectSpec( - bucket=full_bucket_name, object=object_name + bucket=full_bucket_name, object=_TEST_OBJECT_NAME ), ) + + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + generation_number=_TEST_GENERATION_NUMBER, + ) + mock_async_bidi_rpc.assert_called_once_with( mock.sentinel.A, initial_request=first_bidi_read_req, @@ -57,8 +60,98 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc): assert read_obj_stream.socket_like_rpc is mock_async_bidi_rpc.return_value -def test_init_with_invalid_parameters(): - """Test the constructor of _AsyncReadObjectStream with invalid params.""" +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_open(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + read_obj_stream.socket_like_rpc.open = AsyncMock() + + recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse) + recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object) + recv_response.metadata.generation = _TEST_GENERATION_NUMBER + recv_response.read_handle = _TEST_READ_HANDLE + read_obj_stream.socket_like_rpc.recv = AsyncMock(return_value=recv_response) + + # act + await read_obj_stream.open() + + # assert + read_obj_stream.socket_like_rpc.open.assert_called_once() + read_obj_stream.socket_like_rpc.recv.assert_called_once() + + assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER + assert read_obj_stream.read_handle == _TEST_READ_HANDLE + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_close(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + read_obj_stream.socket_like_rpc.close = AsyncMock() + + # act + await read_obj_stream.close() + + # assert + read_obj_stream.socket_like_rpc.close.assert_called_once() + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_send(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + read_obj_stream.socket_like_rpc.send = AsyncMock() + + # act + bidi_read_object_request = _storage_v2.BidiReadObjectRequest() + await read_obj_stream.send(bidi_read_object_request) + + # assert + read_obj_stream.socket_like_rpc.send.assert_called_once_with( + bidi_read_object_request + ) + + +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client" +) +@pytest.mark.asyncio +async def test_recv(mock_client): + # arrange + read_obj_stream = _AsyncReadObjectStream( + client=mock_client, + bucket_name=_TEST_BUCKET_NAME, + object_name=_TEST_OBJECT_NAME, + ) + bidi_read_object_response = _storage_v2.BidiReadObjectResponse() + read_obj_stream.socket_like_rpc.recv = AsyncMock( + return_value=bidi_read_object_response + ) + + # act + response = await read_obj_stream.recv() - with pytest.raises(ValueError): - _AsyncReadObjectStream(None, bucket_name=None, object_name=None) + # assert + read_obj_stream.socket_like_rpc.recv.assert_called_once() + assert response == bidi_read_object_response