Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 6adebc0

Browse files
committed
add init version of MultiRangeDownloader
1 parent 5105bfa commit 6adebc0

2 files changed

Lines changed: 141 additions & 2 deletions

File tree

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""
2+
Mrd_generic(bucket, obj,gen=None, read_handle=None)
3+
mrd = Mrd(bucket, obj, gen)
4+
5+
mrd = Mrd(bucket, obj)
6+
mrd = Mrd.create_from(client, bucket, obj)
7+
Mrd_generic(bucket, obj,gen=None, read_handle=None)
8+
* set attributes
9+
* instantiate read_object_strea
10+
* async stream.open
11+
mrd = Mrd(read_handle)
12+
mrd.download_ranges([(range_start, range_end, buf)])
13+
14+
mrr = await MultiRangeDownloader.create_mrd(client, bucket, obj)
15+
await mrr.download_ranges([(range_start, range_end, buf)])
16+
17+
18+
"""
19+
20+
from async_read_object_stream import AsyncReadObjectStream
21+
from async_grpc_client import AsyncGrpcClient
22+
from io import BytesIO
23+
from google.cloud import _storage_v2
24+
import sys
25+
import asyncio
26+
27+
28+
class MultiRangeDownloader:
29+
30+
@classmethod
31+
async def create_mrd(cls, client, bucket_name, object_name, generation_number=None):
32+
# inti
33+
# async mrd.open()
34+
mrd = cls(client, bucket_name, object_name, generation_number)
35+
await mrd.open()
36+
return mrd
37+
38+
@classmethod
39+
def create_mrd_from_read_handle(cls, client, read_handle):
40+
raise NotImplementedError("TODO")
41+
42+
def __init__(
43+
self,
44+
client,
45+
bucket_name=None,
46+
object_name=None,
47+
generation_number=None,
48+
read_handle=None, # open with rea
49+
):
50+
self.client = client
51+
self.bucket_name = bucket_name
52+
self.object_name = object_name
53+
self.generation_number = generation_number
54+
self.read_handle = read_handle
55+
56+
async def open(self):
57+
self.read_obj_str = AsyncReadObjectStream(
58+
client=self.client,
59+
bucket_name=self.bucket_name,
60+
object_name=self.object_name,
61+
generation_number=self.generation_number,
62+
read_handle=self.read_handle,
63+
)
64+
await self.read_obj_str.open()
65+
pass
66+
67+
async def download_ranges(self, ranges):
68+
first_range = ranges[0]
69+
start = first_range[0]
70+
end = first_range[1]
71+
buffer = first_range[2]
72+
# create bidiReadReq
73+
read_id = 1
74+
await self.read_obj_str.send(
75+
_storage_v2.BidiReadObjectRequest(
76+
read_ranges=[
77+
_storage_v2.ReadRange(
78+
read_offset=start, read_length=end, read_id=read_id
79+
)
80+
]
81+
)
82+
)
83+
# while read_end is not reached.
84+
read_ids_set = set()
85+
read_ids_set.add(read_id)
86+
bytes_received = 0
87+
while len(read_ids_set) > 0:
88+
response = await self.read_obj_str.recv()
89+
if response is None:
90+
print("None response received, something went wrong.")
91+
sys.exit(1)
92+
for object_data_chunk in response.object_data_ranges:
93+
data = object_data_chunk.checksummed_data.content
94+
buffer.write(data)
95+
print(data)
96+
print(object_data_chunk.checksummed_data.crc32c)
97+
98+
if object_data_chunk.read_range is not None:
99+
# bytes downloaded in this response.
100+
curr_iter_bytes = object_data_chunk.read_range.read_length
101+
bytes_received += curr_iter_bytes
102+
# if curr_iter_bytes != 2 * 1024 * 1024:
103+
# print(
104+
# "bytes received in current iter, for read_id",
105+
# curr_iter_bytes,
106+
# object_data_chunk.read_range.read_id,
107+
# )
108+
# print(
109+
# "bytes received in current iter, for read_id",
110+
# curr_iter_bytes,
111+
# object_data_chunk.read_range.read_id,
112+
# )
113+
114+
if (
115+
object_data_chunk.range_end is not None
116+
and object_data_chunk.range_end
117+
):
118+
# print(
119+
# f"Read ID {object_data_chunk.read_range.read_id} completed."
120+
# )
121+
read_ids_set.remove(object_data_chunk.read_range.read_id)
122+
print("downloaded bytes", bytes_received)
123+
124+
# pass
125+
126+
127+
async def test_mrd():
128+
client = AsyncGrpcClient()._grpc_client
129+
mrd = await MultiRangeDownloader.create_mrd(
130+
client, bucket_name="chandrasiri-rs", object_name="test_open9"
131+
)
132+
my_buff = BytesIO()
133+
await mrd.download_ranges([(0, 10, my_buff)])
134+
# print()
135+
print("downloaded bytes", my_buff.getbuffer().nbytes)
136+
137+
138+
if __name__ == "__main__":
139+
asyncio.run(test_mrd())

google/cloud/storage/_experimental/async_read_object_stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
"""
2727

2828

29-
class AsynReadObjectStream(AsyncAbstractObjectStream):
29+
class AsyncReadObjectStream(AsyncAbstractObjectStream):
3030
def __init__(
3131
self,
3232
client,
@@ -109,7 +109,7 @@ async def recv(self):
109109

110110
async def test(bucket_name, object_name):
111111
client = AsyncGrpcClient()._grpc_client
112-
async_read_obj_str = AsynReadObjectStream(
112+
async_read_obj_str = AsyncReadObjectStream(
113113
client, bucket_name=bucket_name, object_name=object_name
114114
)
115115
await async_read_obj_str.open()

0 commit comments

Comments
 (0)