Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
21e66a3
add AsyncAbstractObjectStream
chandra-siri Sep 17, 2025
39503f4
keep _AsyncAbstractObjectStream private
chandra-siri Sep 17, 2025
a161fd0
Add _AsyncReadObjectStream and it's stubs
chandra-siri Sep 17, 2025
dd862a2
complete __init__ for read_obj_str
chandra-siri Sep 17, 2025
aaabfd7
remove unuseful comments
chandra-siri Sep 17, 2025
23eea96
add methods open close send recv
chandra-siri Sep 17, 2025
71a7a79
change read_handle type from 'str' to 'bytes'
chandra-siri Sep 18, 2025
3c0fd66
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 18, 2025
0810afc
fix doc strings, add licence and type hints
chandra-siri Sep 18, 2025
d43d889
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
a14bc68
pass abstract methods
chandra-siri Sep 18, 2025
fbacbb4
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
fd37489
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
635ad07
add handle param
chandra-siri Sep 18, 2025
ba453d4
include handle in tests
chandra-siri Sep 18, 2025
800c6df
remove unit tests for abstract class
chandra-siri Sep 18, 2025
8b40812
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
18529ad
edit doc string for _AsyncReadObjectStream
chandra-siri Sep 18, 2025
df2532e
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
b4da1ac
refactor unit tests for async_read_object_stream
chandra-siri Sep 18, 2025
6dec6c6
bucket_name and object_name cannot be NONE
chandra-siri Sep 18, 2025
90a65f6
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
a154905
bucket_name and object_name cannot be None
chandra-siri Sep 18, 2025
d69cd63
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
20afca4
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
14a2aba
simplyfy tests for open
chandra-siri Sep 19, 2025
078afca
simply tests for send recv and close
chandra-siri Sep 19, 2025
2054989
minor edit - add bidi-stream in doc string
chandra-siri Sep 19, 2025
2b9ae2e
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 19, 2025
c06896c
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
916e7f1
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
0991383
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 19, 2025
f589b89
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
0e60694
add checks for invalid inputs
chandra-siri Sep 19, 2025
961def8
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
e9d0f9e
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 22, 2025
c4f61ea
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
dcb6a55
remove duplicated import
chandra-siri Sep 23, 2025
90d8597
remove unused import
chandra-siri Sep 23, 2025
229887b
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
73628f1
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 23, 2025
1473caa
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 23, 2025
fdc3ab9
remove unused import
chandra-siri Sep 23, 2025
5da65b1
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

"""

from typing import Any, Optional
from typing import Optional
from google.cloud import _storage_v2
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
Expand Down Expand Up @@ -94,15 +94,42 @@ def __init__(
)

async def open(self) -> None:
pass
"""Opens the bidi-gRPC connection to read from the object.

This method sends an initial request to start the stream and receives
the first response containing metadata and a read handle.
"""
await self.socket_like_rpc.open() # this is actually 1 send
response = await self.socket_like_rpc.recv()
if self.generation_number is None:
self.generation_number = response.metadata.generation

self.read_handle = response.read_handle

async def close(self) -> None:
pass
"""Closes the bidi-gRPC connection."""
await self.socket_like_rpc.close()

async def send(
self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest
) -> None:
pass

async def recv(self) -> Any:
pass
"""Sends a request message on the stream.

Args:
bidi_read_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`):
The request message to send. This is typically used to specify
the read offset and limit.
"""
await self.socket_like_rpc.send(bidi_read_object_request)

async def recv(self) -> _storage_v2.BidiReadObjectResponse:
"""Receives a response from the stream.

This method waits for the next message from the server, which could
contain object data or metadata.

Returns:
:class:`~google.cloud._storage_v2.types.BidiReadObjectResponse`:
The response message from the server.
"""
return await self.socket_like_rpc.recv()
127 changes: 110 additions & 17 deletions tests/unit/asyncio/test_async_read_object_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

import pytest
from unittest import mock
from unittest.mock import AsyncMock
from google.cloud import _storage_v2

from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
_AsyncReadObjectStream,
)

_TEST_BUCKET_NAME = "test-bucket"
_TEST_OBJECT_NAME = "test-object"
_TEST_GENERATION_NUMBER = 12345
_TEST_READ_HANDLE = b"test-read-handle"


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
Expand All @@ -28,27 +34,24 @@
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
# initialize with bucket, object_name and generation number. & client.
bucket_name = "test-bucket"
object_name = "test-object"
generation_number = 12345
mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc"
mock_client._client._transport._wrapped_methods = {
"bidi_read_object_rpc": mock.sentinel.A
}

read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=bucket_name,
object_name=object_name,
generation_number=generation_number,
)
full_bucket_name = f"projects/_/buckets/{bucket_name}"
full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}"
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
read_object_spec=_storage_v2.BidiReadObjectSpec(
bucket=full_bucket_name, object=object_name
bucket=full_bucket_name, object=_TEST_OBJECT_NAME
),
)

read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
)

mock_async_bidi_rpc.assert_called_once_with(
mock.sentinel.A,
initial_request=first_bidi_read_req,
Expand All @@ -57,8 +60,98 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
assert read_obj_stream.socket_like_rpc is mock_async_bidi_rpc.return_value


def test_init_with_invalid_parameters():
"""Test the constructor of _AsyncReadObjectStream with invalid params."""
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_open(mock_client):
# arrange
read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
)
read_obj_stream.socket_like_rpc.open = AsyncMock()

recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
recv_response.read_handle = _TEST_READ_HANDLE
read_obj_stream.socket_like_rpc.recv = AsyncMock(return_value=recv_response)

# act
await read_obj_stream.open()

# assert
read_obj_stream.socket_like_rpc.open.assert_called_once()
read_obj_stream.socket_like_rpc.recv.assert_called_once()

assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
assert read_obj_stream.read_handle == _TEST_READ_HANDLE


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_close(mock_client):
# arrange
read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
)
read_obj_stream.socket_like_rpc.close = AsyncMock()

# act
await read_obj_stream.close()

# assert
read_obj_stream.socket_like_rpc.close.assert_called_once()


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_send(mock_client):
# arrange
read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
)
read_obj_stream.socket_like_rpc.send = AsyncMock()

# act
bidi_read_object_request = _storage_v2.BidiReadObjectRequest()
await read_obj_stream.send(bidi_read_object_request)

# assert
read_obj_stream.socket_like_rpc.send.assert_called_once_with(
bidi_read_object_request
)


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_recv(mock_client):
# arrange
read_obj_stream = _AsyncReadObjectStream(
client=mock_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
)
bidi_read_object_response = _storage_v2.BidiReadObjectResponse()
read_obj_stream.socket_like_rpc.recv = AsyncMock(
return_value=bidi_read_object_response
)

# act
response = await read_obj_stream.recv()

with pytest.raises(ValueError):
_AsyncReadObjectStream(None, bucket_name=None, object_name=None)
# assert
read_obj_stream.socket_like_rpc.recv.assert_called_once()
assert response == bidi_read_object_response