Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
21e66a3
add AsyncAbstractObjectStream
chandra-siri Sep 17, 2025
39503f4
keep _AsyncAbstractObjectStream private
chandra-siri Sep 17, 2025
a161fd0
Add _AsyncReadObjectStream and it's stubs
chandra-siri Sep 17, 2025
dd862a2
complete __init__ for read_obj_str
chandra-siri Sep 17, 2025
aaabfd7
remove unuseful comments
chandra-siri Sep 17, 2025
23eea96
add methods open close send recv
chandra-siri Sep 17, 2025
71a7a79
change read_handle type from 'str' to 'bytes'
chandra-siri Sep 18, 2025
827aec0
feat: add async_multi_range_downloader
chandra-siri Sep 18, 2025
5be7469
fix: read_ranges should have buffer as well
chandra-siri Sep 18, 2025
b3ad551
rename MultiRangeDownloader to AsyncMultiRangeDownloader
chandra-siri Sep 18, 2025
a87f2be
feat: implement download_ranges method
chandra-siri Sep 18, 2025
c2e3c7b
add BytesIO in doc string
chandra-siri Sep 18, 2025
3c0fd66
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 18, 2025
0810afc
fix doc strings, add licence and type hints
chandra-siri Sep 18, 2025
d43d889
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
a14bc68
pass abstract methods
chandra-siri Sep 18, 2025
fbacbb4
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
fd37489
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
635ad07
add handle param
chandra-siri Sep 18, 2025
ba453d4
include handle in tests
chandra-siri Sep 18, 2025
800c6df
remove unit tests for abstract class
chandra-siri Sep 18, 2025
8b40812
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
18529ad
edit doc string for _AsyncReadObjectStream
chandra-siri Sep 18, 2025
df2532e
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
b4da1ac
refactor unit tests for async_read_object_stream
chandra-siri Sep 18, 2025
6dec6c6
bucket_name and object_name cannot be NONE
chandra-siri Sep 18, 2025
90a65f6
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 18, 2025
a154905
bucket_name and object_name cannot be None
chandra-siri Sep 18, 2025
d69cd63
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 18, 2025
20afca4
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
14a2aba
simplyfy tests for open
chandra-siri Sep 19, 2025
078afca
simply tests for send recv and close
chandra-siri Sep 19, 2025
2054989
minor edit - add bidi-stream in doc string
chandra-siri Sep 19, 2025
2b9ae2e
Merge branch 'bidi_reads_1_abs_obj_stream' of github.com:googleapis/p…
chandra-siri Sep 19, 2025
c06896c
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
916e7f1
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
3658502
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
8366f0b
simplify unit tests
chandra-siri Sep 19, 2025
4a774ec
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
0991383
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 19, 2025
f589b89
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
06b102c
improve doc string
chandra-siri Sep 19, 2025
52494b4
fix unit tess in MRD
chandra-siri Sep 19, 2025
8e00701
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
0e60694
add checks for invalid inputs
chandra-siri Sep 19, 2025
961def8
Merge branch 'bidi_reads_2_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 19, 2025
e9d0f9e
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 22, 2025
c4f61ea
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
ef5d917
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
7fbf28c
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 22, 2025
dcb6a55
remove duplicated import
chandra-siri Sep 23, 2025
90d8597
remove unused import
chandra-siri Sep 23, 2025
229887b
Merge branch 'bidi_reads_3_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
5a6ffc7
Merge branch 'bidi_reads_4_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
9f15551
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
234336f
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 23, 2025
521154c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 23, 2025
8338ab2
fix unit test
chandra-siri Sep 23, 2025
c3cb076
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
ac8e651
remove unused import
chandra-siri Sep 23, 2025
c6b1098
Merge branch 'bidi_reads_5_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 23, 2025
6469063
implement basic functionality for download_ranges
chandra-siri Sep 24, 2025
17404af
Merge branch 'main' of github.com:googleapis/python-storage into bidi…
chandra-siri Sep 24, 2025
130046f
doc string for further testcase
chandra-siri Sep 24, 2025
0c61e87
remove unwanted comments
chandra-siri Sep 24, 2025
027758c
remove testing code
chandra-siri Sep 24, 2025
570983f
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 24, 2025
99ffe2d
fix doc strings
chandra-siri Sep 24, 2025
e571dd1
Merge branch 'bidi_reads_6_read_obj_stream' of github.com:googleapis/…
chandra-siri Sep 24, 2025
fdaf1ae
update doc string to describe `read_ranges` format
chandra-siri Sep 25, 2025
112cb68
add test case for ranges > 1000
chandra-siri Sep 26, 2025
b56efd9
don't return exception object , raise instead
chandra-siri Sep 29, 2025
890eac1
don't return exception object , raise instead
chandra-siri Sep 29, 2025
84b63f1
correct doc string rtype
chandra-siri Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations

from typing import Any, List, Optional, Tuple
from typing import List, Optional, Tuple

from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
_AsyncReadObjectStream,
Expand All @@ -24,6 +24,38 @@
)

from io import BytesIO
from google.cloud import _storage_v2


_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100


class Result:
"""An instance of this class will be populated and retured for each
`read_range` provided to ``download_ranges`` method.

"""

def __init__(self, bytes_requested: int):
# only while instantiation, should not be edited later.
# hence there's no setter, only getter is provided.
self._bytes_requested: int = bytes_requested
self._bytes_written: int = 0

@property
def bytes_requested(self) -> int:
return self._bytes_requested

@property
def bytes_written(self) -> int:
return self._bytes_written

@bytes_written.setter
def bytes_written(self, value: int):
self._bytes_written = value

def __repr__(self):
return f"bytes_requested: {self._bytes_requested}, bytes_written: {self._bytes_written}"


class AsyncMultiRangeDownloader:
Expand All @@ -38,21 +70,30 @@ class AsyncMultiRangeDownloader:
mrd = await AsyncMultiRangeDownloader.create_mrd(
client, bucket_name="chandrasiri-rs", object_name="test_open9"
)
my_buff1 = BytesIO()
my_buff1 = open('my_fav_file.txt', 'wb')
Comment thread
chandra-siri marked this conversation as resolved.
my_buff2 = BytesIO()
my_buff3 = BytesIO()
my_buff4 = BytesIO()
buffers = [my_buff1, my_buff2, my_buff3, my_buff4]
await mrd.download_ranges(
my_buff4 = any_object_which_provides_BytesIO_like_interface()
results_arr, error_obj = await mrd.download_ranges(
[
# (start_byte, bytes_to_read, writeable_buffer)
(0, 100, my_buff1),
(100, 200, my_buff2),
(200, 300, my_buff3),
(300, 400, my_buff4),
(100, 20, my_buff2),
(200, 123, my_buff3),
(300, 789, my_buff4),
]
)
for buff in buffers:
print("downloaded bytes", buff.getbuffer().nbytes)
if error_obj:
print("Error occurred: ")
print(error_obj)
print(
"please issue call to `download_ranges` with updated"
"`read_ranges` based on diff of (bytes_requested - bytes_written)"
)

for result in results_arr:
print("downloaded bytes", result)


"""

Expand Down Expand Up @@ -148,18 +189,70 @@ async def open(self) -> None:
self.read_handle = self.read_obj_str.read_handle
return

async def download_ranges(self, read_ranges: List[Tuple[int, int, BytesIO]]) -> Any:
async def download_ranges(
self, read_ranges: List[Tuple[int, int, BytesIO]]
) -> List[Result]:
"""Downloads multiple byte ranges from the object into the buffers
provided by user.

:type read_ranges: List[Tuple[int, int, "BytesIO"]]
:param read_ranges: A list of tuples, where each tuple represents a
byte range (start_byte, end_byte, buffer) to download. Buffer has to
be provided by the user, and user has to make sure appropriate
byte range (start_byte, bytes_to_read, writeable_buffer). Buffer has
to be provided by the user, and user has to make sure appropriate
memory is available in the application to avoid out-of-memory crash.

:rtype: List[:class:`~google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Result`]
:returns: A list of ``Result`` objects, where each object corresponds
to a requested range.

Raises:
NotImplementedError: This method is not yet implemented.
"""
raise NotImplementedError("TODO")
if len(read_ranges) > 1000:
raise ValueError(
"Invalid input - length of read_ranges cannot be more than 1000"
)
Comment thread
chandra-siri marked this conversation as resolved.

read_id_to_writable_buffer_dict = {}
results = []
for i in range(0, len(read_ranges), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST):
read_ranges_segment = read_ranges[
i : i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST
]

read_ranges_for_bidi_req = []
for j, read_range in enumerate(read_ranges_segment):
read_id = i + j
read_id_to_writable_buffer_dict[read_id] = read_range[2]
bytes_requested = read_range[1]
Comment thread
chandra-siri marked this conversation as resolved.
results.append(Result(bytes_requested))
read_ranges_for_bidi_req.append(
_storage_v2.ReadRange(
read_offset=read_range[0],
read_length=bytes_requested,
read_id=read_id,
)
)
await self.read_obj_str.send(
_storage_v2.BidiReadObjectRequest(read_ranges=read_ranges_for_bidi_req)
)
Comment thread
chandra-siri marked this conversation as resolved.

while len(read_id_to_writable_buffer_dict) > 0:
response = await self.read_obj_str.recv()

if response is None:
raise Exception("None response received, something went wrong.")

for object_data_range in response.object_data_ranges:
if object_data_range.read_range is None:
raise Exception("Invalid response, read_range is None")

data = object_data_range.checksummed_data.content
read_id = object_data_range.read_range.read_id
buffer = read_id_to_writable_buffer_dict[read_id]
buffer.write(data)
results[read_id].bytes_written += len(data)

if object_data_range.range_end:
del read_id_to_writable_buffer_dict[
object_data_range.read_range.read_id
]
return results
165 changes: 124 additions & 41 deletions tests/unit/asyncio/test_async_multi_range_downloader.py
Comment thread
chandra-siri marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pytest
from unittest import mock
from unittest.mock import AsyncMock
from google.cloud import _storage_v2

from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
Expand All @@ -28,52 +29,134 @@
_TEST_READ_HANDLE = b"test-handle"


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_create_mrd(mock_async_grpc_client, async_read_object_stream):
# Arrange
mock_stream_instance = async_read_object_stream.return_value
mock_stream_instance.open = AsyncMock()
mock_stream_instance.generation_number = _TEST_GENERATION_NUMBER
mock_stream_instance.read_handle = _TEST_READ_HANDLE

# act
mrd = await AsyncMultiRangeDownloader.create_mrd(
mock_async_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
)

# Assert
async_read_object_stream.assert_called_once_with(
client=mock_async_grpc_client,
class TestAsyncMultiRangeDownloader:
# helper method
@pytest.mark.asyncio
async def _make_mock_mrd(
self,
mock_grpc_client,
mock_cls_async_read_object_stream,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
generation_number=None,
read_handle=None,
generation_number=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
):
mock_stream = mock_cls_async_read_object_stream.return_value
mock_stream.open = AsyncMock()
mock_stream.generation_number = _TEST_GENERATION_NUMBER
mock_stream.read_handle = _TEST_READ_HANDLE

mrd = await AsyncMultiRangeDownloader.create_mrd(
mock_grpc_client, bucket_name, object_name, generation_number, read_handle
)

return mrd

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
mock_stream_instance.open.assert_called_once()
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_create_mrd(
self, mock_grpc_client, mock_cls_async_read_object_stream
):
# Arrange & Act
mrd = await self._make_mock_mrd(
mock_grpc_client, mock_cls_async_read_object_stream
)

assert mrd.client == mock_async_grpc_client
assert mrd.bucket_name == _TEST_BUCKET_NAME
assert mrd.object_name == _TEST_OBJECT_NAME
assert mrd.generation_number == _TEST_GENERATION_NUMBER
assert mrd.read_handle == _TEST_READ_HANDLE
assert mrd.read_obj_str is mock_stream_instance
# Assert
mock_cls_async_read_object_stream.assert_called_once_with(
client=mock_grpc_client,
bucket_name=_TEST_BUCKET_NAME,
object_name=_TEST_OBJECT_NAME,
generation_number=_TEST_GENERATION_NUMBER,
read_handle=_TEST_READ_HANDLE,
)

mrd.read_obj_str.open.assert_called_once()

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_download_ranges(mock_async_grpc_client):
"""Test that download_ranges() raises NotImplementedError."""
mrd = AsyncMultiRangeDownloader(
mock_async_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
assert mrd.client == mock_grpc_client
assert mrd.bucket_name == _TEST_BUCKET_NAME
assert mrd.object_name == _TEST_OBJECT_NAME
assert mrd.generation_number == _TEST_GENERATION_NUMBER
assert mrd.read_handle == _TEST_READ_HANDLE

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
)
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_download_ranges(
self, mock_grpc_client, mock_cls_async_read_object_stream
):
# Arrange
mock_mrd = await self._make_mock_mrd(
mock_grpc_client, mock_cls_async_read_object_stream
)
mock_mrd.read_obj_str.send = AsyncMock()
mock_mrd.read_obj_str.recv = AsyncMock()
mock_mrd.read_obj_str.recv.return_value = _storage_v2.BidiReadObjectResponse(
object_data_ranges=[
_storage_v2.ObjectRangeData(
checksummed_data=_storage_v2.ChecksummedData(
content=b"these_are_18_chars", crc32c=123
),
range_end=True,
read_range=_storage_v2.ReadRange(
read_offset=0, read_length=18, read_id=0
),
)
],
)

# Act
buffer = BytesIO()
results = await mock_mrd.download_ranges([(0, 18, buffer)])

# Assert
mock_mrd.read_obj_str.send.assert_called_once_with(
_storage_v2.BidiReadObjectRequest(
read_ranges=[
_storage_v2.ReadRange(read_offset=0, read_length=18, read_id=0)
]
)
)
assert len(results) == 1
assert results[0].bytes_requested == 18
assert results[0].bytes_written == 18
assert buffer.getvalue() == b"these_are_18_chars"

def create_read_ranges(self, num_ranges):
ranges = []
for i in range(num_ranges):
ranges.append(
_storage_v2.ReadRange(read_offset=i, read_length=1, read_id=i)
)
return ranges

@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
)
@pytest.mark.asyncio
async def test_downloading_ranges_with_more_than_1000_should_throw_error(
self, mock_grpc_client
):
# Arrange
mrd = AsyncMultiRangeDownloader(
mock_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
)

# Act + Assert
with pytest.raises(ValueError) as exc:
await mrd.download_ranges(self.create_read_ranges(1001))

with pytest.raises(NotImplementedError):
await mrd.download_ranges([(0, 100, BytesIO())])
# Assert
assert (
str(exc.value)
== "Invalid input - length of read_ranges cannot be more than 1000"
)