1313# limitations under the License.
1414
1515from __future__ import annotations
16+ import google_crc32c
17+ from google .api_core import exceptions
18+ from google_crc32c import Checksum
1619
17- from typing import Any , List , Optional , Tuple
20+ from typing import List , Optional , Tuple
1821
1922from google .cloud .storage ._experimental .asyncio .async_read_object_stream import (
2023 _AsyncReadObjectStream ,
2427)
2528
2629from io import BytesIO
30+ from google .cloud import _storage_v2
31+ from google .cloud .storage .exceptions import DataCorruption
32+
33+
34+ _MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
35+
36+
37+ class Result :
38+ """An instance of this class will be populated and retured for each
39+ `read_range` provided to ``download_ranges`` method.
40+
41+ """
42+
43+ def __init__ (self , bytes_requested : int ):
44+ # only while instantiation, should not be edited later.
45+ # hence there's no setter, only getter is provided.
46+ self ._bytes_requested : int = bytes_requested
47+ self ._bytes_written : int = 0
48+
49+ @property
50+ def bytes_requested (self ) -> int :
51+ return self ._bytes_requested
52+
53+ @property
54+ def bytes_written (self ) -> int :
55+ return self ._bytes_written
56+
57+ @bytes_written .setter
58+ def bytes_written (self , value : int ):
59+ self ._bytes_written = value
60+
61+ def __repr__ (self ):
62+ return f"bytes_requested: { self ._bytes_requested } , bytes_written: { self ._bytes_written } "
2763
2864
2965class AsyncMultiRangeDownloader :
@@ -38,21 +74,23 @@ class AsyncMultiRangeDownloader:
3874 mrd = await AsyncMultiRangeDownloader.create_mrd(
3975 client, bucket_name="chandrasiri-rs", object_name="test_open9"
4076 )
41- my_buff1 = BytesIO( )
77+ my_buff1 = open('my_fav_file.txt', 'wb' )
4278 my_buff2 = BytesIO()
4379 my_buff3 = BytesIO()
44- my_buff4 = BytesIO()
45- buffers = [my_buff1, my_buff2, my_buff3, my_buff4]
46- await mrd.download_ranges(
80+ my_buff4 = any_object_which_provides_BytesIO_like_interface()
81+ results_arr = await mrd.download_ranges(
4782 [
83+ # (start_byte, bytes_to_read, writeable_buffer)
4884 (0, 100, my_buff1),
49- (100, 200 , my_buff2),
50- (200, 300 , my_buff3),
51- (300, 400 , my_buff4),
85+ (100, 20 , my_buff2),
86+ (200, 123 , my_buff3),
87+ (300, 789 , my_buff4),
5288 ]
5389 )
54- for buff in buffers:
55- print("downloaded bytes", buff.getbuffer().nbytes)
90+
91+ for result in results_arr:
92+ print("downloaded bytes", result)
93+
5694
5795 """
5896
@@ -119,12 +157,23 @@ def __init__(
119157 :type read_handle: bytes
120158 :param read_handle: (Optional) An existing read handle.
121159 """
160+
161+ # Verify that the fast, C-accelerated version of crc32c is available.
162+ # If not, raise an error to prevent silent performance degradation.
163+ if google_crc32c .implementation != "c" :
164+ raise exceptions .NotFound (
165+ "The google-crc32c package is not installed with C support. "
166+ "Bidi reads require the C extension for data integrity checks."
167+ "For more information, see https://github.com/googleapis/python-crc32c."
168+ )
169+
122170 self .client = client
123171 self .bucket_name = bucket_name
124172 self .object_name = object_name
125173 self .generation_number = generation_number
126174 self .read_handle = read_handle
127- self .read_obj_str : _AsyncReadObjectStream = None
175+ self .read_obj_str : Optional [_AsyncReadObjectStream ] = None
176+ self ._is_stream_open : bool = False
128177
129178 async def open (self ) -> None :
130179 """Opens the bidi-gRPC connection to read from the object.
@@ -135,31 +184,118 @@ async def open(self) -> None:
135184 "Opening" constitutes fetching object metadata such as generation number
136185 and read handle and sets them as attributes if not already set.
137186 """
138- self .read_obj_str = _AsyncReadObjectStream (
139- client = self .client ,
140- bucket_name = self .bucket_name ,
141- object_name = self .object_name ,
142- generation_number = self .generation_number ,
143- read_handle = self .read_handle ,
144- )
187+ if self ._is_stream_open :
188+ raise ValueError ("Underlying bidi-gRPC stream is already open" )
189+
190+ if self .read_obj_str is None :
191+ self .read_obj_str = _AsyncReadObjectStream (
192+ client = self .client ,
193+ bucket_name = self .bucket_name ,
194+ object_name = self .object_name ,
195+ generation_number = self .generation_number ,
196+ read_handle = self .read_handle ,
197+ )
145198 await self .read_obj_str .open ()
199+ self ._is_stream_open = True
146200 if self .generation_number is None :
147201 self .generation_number = self .read_obj_str .generation_number
148202 self .read_handle = self .read_obj_str .read_handle
149203 return
150204
151- async def download_ranges (self , read_ranges : List [Tuple [int , int , BytesIO ]]) -> Any :
205+ async def download_ranges (
206+ self , read_ranges : List [Tuple [int , int , BytesIO ]]
207+ ) -> List [Result ]:
152208 """Downloads multiple byte ranges from the object into the buffers
153209 provided by user.
154210
155211 :type read_ranges: List[Tuple[int, int, "BytesIO"]]
156212 :param read_ranges: A list of tuples, where each tuple represents a
157- byte range (start_byte, end_byte, buffer) to download . Buffer has to
158- be provided by the user, and user has to make sure appropriate
213+ byte range (start_byte, bytes_to_read, writeable_buffer) . Buffer has
214+ to be provided by the user, and user has to make sure appropriate
159215 memory is available in the application to avoid out-of-memory crash.
160216
217+ :rtype: List[:class:`~google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Result`]
218+ :returns: A list of ``Result`` objects, where each object corresponds
219+ to a requested range.
161220
162- Raises:
163- NotImplementedError: This method is not yet implemented.
164221 """
165- raise NotImplementedError ("TODO" )
222+
223+ if len (read_ranges ) > 1000 :
224+ raise ValueError (
225+ "Invalid input - length of read_ranges cannot be more than 1000"
226+ )
227+
228+ if not self ._is_stream_open :
229+ raise ValueError ("Underlying bidi-gRPC stream is not open" )
230+
231+ read_id_to_writable_buffer_dict = {}
232+ results = []
233+ for i in range (0 , len (read_ranges ), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST ):
234+ read_ranges_segment = read_ranges [
235+ i : i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST
236+ ]
237+
238+ read_ranges_for_bidi_req = []
239+ for j , read_range in enumerate (read_ranges_segment ):
240+ read_id = i + j
241+ read_id_to_writable_buffer_dict [read_id ] = read_range [2 ]
242+ bytes_requested = read_range [1 ]
243+ results .append (Result (bytes_requested ))
244+ read_ranges_for_bidi_req .append (
245+ _storage_v2 .ReadRange (
246+ read_offset = read_range [0 ],
247+ read_length = bytes_requested ,
248+ read_id = read_id ,
249+ )
250+ )
251+ await self .read_obj_str .send (
252+ _storage_v2 .BidiReadObjectRequest (read_ranges = read_ranges_for_bidi_req )
253+ )
254+
255+ while len (read_id_to_writable_buffer_dict ) > 0 :
256+ response = await self .read_obj_str .recv ()
257+
258+ if response is None :
259+ raise Exception ("None response received, something went wrong." )
260+
261+ for object_data_range in response .object_data_ranges :
262+ if object_data_range .read_range is None :
263+ raise Exception ("Invalid response, read_range is None" )
264+
265+ checksummed_data = object_data_range .checksummed_data
266+ data = checksummed_data .content
267+ server_checksum = checksummed_data .crc32c
268+
269+ client_crc32c = Checksum (data ).digest ()
270+ client_checksum = int .from_bytes (client_crc32c , "big" )
271+
272+ if server_checksum != client_checksum :
273+ raise DataCorruption (response ,
274+ f"Checksum mismatch for read_id { object_data_range .read_range .read_id } . "
275+ f"Server sent { server_checksum } , client calculated { client_checksum } ."
276+ )
277+
278+ read_id = object_data_range .read_range .read_id
279+ buffer = read_id_to_writable_buffer_dict [read_id ]
280+ buffer .write (data )
281+ results [read_id ].bytes_written += len (data )
282+
283+ if object_data_range .range_end :
284+ del read_id_to_writable_buffer_dict [
285+ object_data_range .read_range .read_id
286+ ]
287+
288+ return results
289+
290+ async def close (self ):
291+ """
292+ Closes the underlying bidi-gRPC connection.
293+ """
294+ if not self ._is_stream_open :
295+ raise ValueError ("Underlying bidi-gRPC stream is not open" )
296+ await self .read_obj_str .close ()
297+ self ._is_stream_open = False
298+
299+ @property
300+ def is_stream_open (self ) -> bool :
301+ return self ._is_stream_open
0 commit comments