Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,32 @@
from io import BytesIO
from google.cloud import _storage_v2

import asyncio
import traceback
import logging
import datetime
import time

_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


def my_time(time_seconds: float) -> str:
"""
this function takes outupt of time.time() in seconds and returns a human readable timestamp with accuracy up to nanoseconds.
For example, "2023-10-05 14:23:45.123"
:return: str: formatted timestamp

"""

ts = datetime.datetime.fromtimestamp(time_seconds)
return ts.strftime("%Y-%m-%d %H:%M:%S.%f")

return


class Result:
Expand Down Expand Up @@ -160,6 +184,9 @@ def __init__(
self.read_handle = read_handle
self.read_obj_str: Optional[_AsyncReadObjectStream] = None
self._is_stream_open: bool = False
self.read_id_to_writable_buffer_dict = {}
self.read_id_to_func_offset = {}
self.func_id_to_pending_read_ids = {}

async def open(self) -> None:
"""Opens the bidi-gRPC connection to read from the object.
Expand Down Expand Up @@ -189,8 +216,8 @@ async def open(self) -> None:
return

async def download_ranges(
self, read_ranges: List[Tuple[int, int, BytesIO]]
) -> List[Result]:
self, read_ranges: List[Tuple[int, int, BytesIO]], func_offest: int, lock
):
"""Downloads multiple byte ranges from the object into the buffers
provided by user.

Expand All @@ -214,32 +241,71 @@ async def download_ranges(
if not self._is_stream_open:
raise ValueError("Underlying bidi-gRPC stream is not open")

read_id_to_writable_buffer_dict = {}
results = []
read_ids_in_this_func = set()
for i in range(0, len(read_ranges), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST):
read_ranges_segment = read_ranges[
i : i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST
]

read_ranges_for_bidi_req = []
for j, read_range in enumerate(read_ranges_segment):
read_id = i + j
read_id_to_writable_buffer_dict[read_id] = read_range[2]
read_id = i + j + func_offest
self.read_id_to_func_offset[read_id] = func_offest
self.read_id_to_writable_buffer_dict[read_id] = read_range[2]
read_ids_in_this_func.add(read_id)
bytes_requested = read_range[1]
results.append(Result(bytes_requested))
# results.append(Result(bytes_requested))
read_ranges_for_bidi_req.append(
_storage_v2.ReadRange(
read_offset=read_range[0],
read_length=bytes_requested,
read_id=read_id,
)
)
# logger.debug("sending read_ranges in func_id %d: ", func_offest)

print(
my_time(time.time()),
"sending read_ranges in funcId: ",
func_offest,
# read_ranges_for_bidi_req,
)
await self.read_obj_str.send(
_storage_v2.BidiReadObjectRequest(read_ranges=read_ranges_for_bidi_req)
)

while len(read_id_to_writable_buffer_dict) > 0:
response = await self.read_obj_str.recv()
self.func_id_to_pending_read_ids[func_offest] = read_ids_in_this_func
# if func_offest == 100:
# await asyncio.sleep(15)
# print("woke up from sleep, recveing to start")
# async with lock:
recv_count = 0
while len(self.func_id_to_pending_read_ids[func_offest]) > 0:
try:
async with asyncio.timeout(10):
# async with lock:
print(
my_time(time.time()),
"receiving read_ranges in func_id: ",
func_offest,
"recv_count:",
recv_count,
)
response = await self.read_obj_str.recv()
print(
my_time(time.time()),
"received read_ranges in func_id: ",
func_offest,
"recv_count:",
recv_count,
)
except TimeoutError as exc:
print("timeout error occurred, Traceback:", traceback.format_exc())
print("in funcId", func_offest)
for read_id, buffer in self.read_id_to_writable_buffer_dict.items():
print("read_id", read_id, "buffer size", buffer.getbuffer().nbytes)
print("*" * 40)
continue

if response is None:
raise Exception("None response received, something went wrong.")
Expand All @@ -248,18 +314,17 @@ async def download_ranges(
if object_data_range.read_range is None:
raise Exception("Invalid response, read_range is None")

data = object_data_range.checksummed_data.content
read_id = object_data_range.read_range.read_id
buffer = read_id_to_writable_buffer_dict[read_id]
# print("received read_id", read_id)
data = object_data_range.checksummed_data.content
buffer = self.read_id_to_writable_buffer_dict[read_id]
buffer.write(data)
results[read_id].bytes_written += len(data)

if object_data_range.range_end:
del read_id_to_writable_buffer_dict[
object_data_range.read_range.read_id
]

return results
tmp_func_offset = self.read_id_to_func_offset[read_id]
self.func_id_to_pending_read_ids[tmp_func_offset].remove(read_id)
recv_count += 1
return

async def close(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ async def recv(self) -> _storage_v2.BidiReadObjectResponse:
"""
if not self._is_stream_open:
raise ValueError("Stream is not open")
return await self.socket_like_rpc.recv()
res = await self.socket_like_rpc.recv()
# if hasattr(res, "object_data_ranges"):
# for object_data_range in res.object_data_ranges:
# print("readid in read_obj_str", object_data_range.read_range.read_id)
return res

@property
def is_stream_open(self) -> bool:
Expand Down
97 changes: 97 additions & 0 deletions samples/snippets/experimental/storage_async_download_ranges.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)
from google.cloud.storage._experimental.asyncio.async_grpc_client import (
AsyncGrpcClient,
)
from io import BytesIO
import asyncio
import argparse
import logging
from grpc._cython import cygrpc


logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s",
)
logging.getLogger("grpc.aio._call").setLevel(logging.DEBUG)
# logging.getLogger(
# "google.cloud.storage._experimental.asyncio.async_multi_range_downloader"
# ).setLevel(logging.DEBUG)


async def test_mrd_by_tasks(bucket_name, object_name, generation_number=None):
# loop1 = asyncio.get_event_loop()
# print("loop1 id ", id(loop1))
# loop2 = cygrpc.get_working_loop()
# print("loop2 id ", id(loop2))
client = AsyncGrpcClient()._grpc_client

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name, generation_number)
await mrd.open()

# mrd2 = AsyncMultiRangeDownloader(
# client, bucket_name, object_name, generation_number
# )
# await mrd2.open()

my_buff1 = BytesIO()
my_buff2 = BytesIO()
my_buff3 = BytesIO()
my_buff4 = BytesIO()
my_buff5 = BytesIO()
my_buff6 = BytesIO()
my_buff7 = BytesIO()
my_buff8 = BytesIO()
ranges1 = [
(0, 100, my_buff1),
(100, 20, my_buff2),
(200, 123, my_buff3),
(300, 789, my_buff4),
]

# ranges2 = [
# (200, 34, my_buff7),
# (300, 73, my_buff8),
# (0, 100, my_buff5),
# (100, 543, my_buff6),
# (1, 4324, BytesIO()),
# (343, 78, BytesIO()),
# ]

# This works fine !
# _ = await mrd.download_ranges(ranges1, 1000)
# _ = await mrd.download_ranges(ranges2, 2000)

# This doesn't work, hangs in `self._cython_call.status()`
# in `grpc/aio/_call.py`
# but when kept under `with asyncio.timeout(30) .... ` it works
# see implementation of `download_ranges` in `async_multi_range_downloader`
# how it's working with asyncio.timeout
lock = asyncio.Lock()
task1 = asyncio.create_task(mrd.download_ranges(ranges1, 1000, lock))
# task2 = asyncio.create_task(mrd.download_ranges(ranges2, 2000, lock))
print("task1 loopid ", id(task1._loop))
# print("task2 loopid ", id(task2._loop))
# _ = await asyncio.gather(task1, task2)

print("downloading complete: Buffer details")
for read_id, buffer in mrd.read_id_to_writable_buffer_dict.items():
print(read_id, buffer.getbuffer().nbytes)

# print("Buffer details for mrd2")
# for read_id, buffer in mrd2.read_id_to_writable_buffer_dict.items():
# print(read_id, buffer.getbuffer().nbytes)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--bucket_name", type=str, required=True)
parser.add_argument("--object_name", type=str, required=True)
parser.add_argument("--generation_number", type=int, default=None)
args = parser.parse_args()
asyncio.run(
test_mrd_by_tasks(args.bucket_name, args.object_name, args.generation_number),
# debug=True,
)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"google-auth >= 2.26.1, < 3.0.0",
"google-api-core >= 2.15.0, < 3.0.0",
"google-cloud-core >= 2.4.2, < 3.0.0",
"grpcio==1.77.0.dev0"
# The dependency "google-resumable-media" is no longer used. However, the
# dependency is still included here to accommodate users who may be
# importing exception classes from the google-resumable-media without
Expand Down