Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
"""
if not self._is_stream_open:
raise ValueError("Stream is not open")
return await self.socket_like_rpc.recv()
response = await self.socket_like_rpc.recv()
# Update read_handle if present in response
if response and response.read_handle:
self.read_handle = response.read_handle
return response

@property
def is_stream_open(self) -> bool:
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/asyncio/test_async_read_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from unittest.mock import AsyncMock
from google.cloud import _storage_v2

from google.cloud.storage._experimental.asyncio import async_read_object_stream
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
_AsyncReadObjectStream,
)
Expand Down Expand Up @@ -273,3 +274,49 @@ async def test_recv_without_open_should_raise_error(

# assert
assert str(exc.value) == "Stream is not open"

@mock.patch("google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc")
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
@pytest.mark.asyncio
async def test_recv_updates_read_handle_on_refresh(mock_client, mock_cls_async_bidi_rpc):
"""
Verify that the `recv` method correctly updates the stream's handle
when a new one is provided in a server response.
"""
# Arrange
socket_like_rpc = AsyncMock()
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
socket_like_rpc.open = AsyncMock()

initial_handle = _storage_v2.BidiReadHandle(handle=b"initial-handle-token")
response_with_initial_handle = _storage_v2.BidiReadObjectResponse(read_handle=initial_handle)
response_without_handle = _storage_v2.BidiReadObjectResponse(read_handle=None)

refreshed_handle = _storage_v2.BidiReadHandle(handle=b"new-refreshed-handle-token")
response_with_refreshed_handle = _storage_v2.BidiReadObjectResponse(read_handle=refreshed_handle)

socket_like_rpc.recv.side_effect = [
response_with_initial_handle,
response_without_handle,
response_with_refreshed_handle,
]

starting_handle = _storage_v2.BidiReadHandle(handle=b"starting-handle-token")
stream = async_read_object_stream._AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
read_handle=starting_handle,
)

# Act & Assert
Comment thread
Pulkit0110 marked this conversation as resolved.
assert stream.read_handle == starting_handle

await stream.open()
assert stream.read_handle == initial_handle

await stream.recv()
assert stream.read_handle == initial_handle

await stream.recv()
assert stream.read_handle == refreshed_handle