diff --git a/google/cloud/storage/_experimental/async_abstract_object_stream.py b/google/cloud/storage/_experimental/async_abstract_object_stream.py new file mode 100644 index 000000000..2f99c9933 --- /dev/null +++ b/google/cloud/storage/_experimental/async_abstract_object_stream.py @@ -0,0 +1,36 @@ +import abc + + +class AsyncAbstractObjectStream(abc.ABC): + """ + Class for both ReadObjectStream as well as WriteObjectStream. + + Attributes will include + 1. bucket_name + 2. object_name + 3. generation_number (if given) + + + """ + + def __init__(self, bucket_name, object_name, generation_number=None): + super().__init__() + self.bucket_name = bucket_name + self.object_name = object_name + self.generation_number = generation_number + + @abc.abstractmethod + async def open(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def close(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def send(self): + raise NotImplementedError("Subclasses should implement this method.") + + @abc.abstractmethod + async def recv(self): + raise NotImplementedError("Subclasses should implement this method.") diff --git a/google/cloud/storage/_experimental/async_multi_range_reader.py b/google/cloud/storage/_experimental/async_multi_range_reader.py new file mode 100644 index 000000000..d46423c0b --- /dev/null +++ b/google/cloud/storage/_experimental/async_multi_range_reader.py @@ -0,0 +1,158 @@ +""" +Mrd_generic(bucket, obj,gen=None, read_handle=None) +mrd = Mrd(bucket, obj, gen) + +mrd = Mrd(bucket, obj) +mrd = Mrd.create_from(client, bucket, obj) + Mrd_generic(bucket, obj,gen=None, read_handle=None) + * set attributes + * instantiate read_object_strea + * async stream.open +mrd = Mrd(read_handle) +mrd.download_ranges([(range_start, range_end, buf)]) + +mrr = await MultiRangeDownloader.create_mrd(client, bucket, obj) +await mrr.download_ranges([(range_start, range_end, buf)]) + + +""" + +from async_read_object_stream import AsyncReadObjectStream +from async_grpc_client import AsyncGrpcClient +from io import BytesIO +from google.cloud import _storage_v2 +import sys +import asyncio +import uuid + +_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100 + + +class MultiRangeDownloader: + + @classmethod + async def create_mrd(cls, client, bucket_name, object_name, generation_number=None): + # inti + # async mrd.open() + mrd = cls(client, bucket_name, object_name, generation_number) + await mrd.open() + return mrd + + @classmethod + def create_mrd_from_read_handle(cls, client, read_handle): + raise NotImplementedError("TODO") + + def __init__( + self, + client, + bucket_name=None, + object_name=None, + generation_number=None, + read_handle=None, # open with rea + ): + self.client = client + self.bucket_name = bucket_name + self.object_name = object_name + self.generation_number = generation_number + self.read_handle = read_handle + + async def open(self): + self.read_obj_str = AsyncReadObjectStream( + client=self.client, + bucket_name=self.bucket_name, + object_name=self.object_name, + generation_number=self.generation_number, + read_handle=self.read_handle, + ) + await self.read_obj_str.open() + if self.generation_number is None: + self.generation_number = self.read_obj_str.generation_number + self.read_handle = self.read_obj_str.read_handle + return + + async def download_ranges(self, read_ranges): + """ + 1.user can provide any number of ranges upto 1000. + 2. + + + """ + if len(read_ranges) > 1000: + raise Exception("Invalid Input - ranges cannot be more than 1000") + + read_id_to_writable_buffer_dict = {} + for i in range(0, len(read_ranges), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST): + read_range_segment = read_ranges[ + i : i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST + ] + + read_ranges_for_bidi_req = [] + for j, read_range in enumerate(read_range_segment): + # generate read_id + read_id = i + j + read_id_to_writable_buffer_dict[read_id] = read_range[2] + read_ranges_for_bidi_req.append( + _storage_v2.ReadRange( + read_offset=read_range[0], + read_length=read_range[1] - read_range[0], # end - start + read_id=read_id, + ) + ) + print(read_ranges_for_bidi_req) + await self.read_obj_str.send( + _storage_v2.BidiReadObjectRequest(read_ranges=read_ranges_for_bidi_req) + ) + while len(read_id_to_writable_buffer_dict) > 0: + response = await self.read_obj_str.recv() + if response is None: + print("None response received, something went wrong.") + sys.exit(1) + for object_data_range in response.object_data_ranges: + + if object_data_range.read_range is None: + raise Exception("Invalid response, read_range is None") + + data = object_data_range.checksummed_data.content + # bytes_received_in_curr_res = object_data_range.read_range.read_length + read_id = object_data_range.read_range.read_id + buffer = read_id_to_writable_buffer_dict[read_id] + buffer.write(data) + print( + "for read_id ", + read_id, + data, + object_data_range.checksummed_data.crc32c, + ) + if object_data_range.range_end: + del read_id_to_writable_buffer_dict[ + object_data_range.read_range.read_id + ] + # print("downloaded bytes", bytes_received) + + # pass + + +async def test_mrd(): + client = AsyncGrpcClient()._grpc_client + mrd = await MultiRangeDownloader.create_mrd( + client, bucket_name="chandrasiri-rs", object_name="test_open9" + ) + my_buff1 = BytesIO() + my_buff2 = BytesIO() + my_buff3 = BytesIO() + my_buff4 = BytesIO() + buffers = [my_buff1, my_buff2, my_buff3, my_buff4] + await mrd.download_ranges( + [ + (0, 100, my_buff1), + (100, 200, my_buff2), + (200, 300, my_buff3), + (300, 400, my_buff4), + ] + ) + for buff in buffers: + print("downloaded bytes", buff.getbuffer().nbytes) + + +if __name__ == "__main__": + asyncio.run(test_mrd()) diff --git a/google/cloud/storage/_experimental/async_read_object_stream.py b/google/cloud/storage/_experimental/async_read_object_stream.py new file mode 100644 index 000000000..19b9a1dc4 --- /dev/null +++ b/google/cloud/storage/_experimental/async_read_object_stream.py @@ -0,0 +1,185 @@ +from google.cloud.storage._experimental.async_abstract_object_stream import ( + AsyncAbstractObjectStream, +) +from bidi_async import AsyncBidiRpc +import asyncio +import argparse +from async_grpc_client import AsyncGrpcClient +from google.cloud import _storage_v2 as storage_v2 +import random +from typing import List + + +""" +Mrr_generic(bucket, obj,gen=None, read_handle=None) +mrr = Mrr(bucket, obj, gen) + +mrr = Mrr(bucket, obj) +mrr = Mrr.create_from(client, bucket, obj) + Mrr_generic(bucket, obj,gen=None, read_handle=None) + * set attributes + * instantiate read_object_strea + * async stream.open +mrr = Mrr(read_handle) +mrr.download_ranges([(range_start, range_end, buf)]) + +""" + + +class AsyncReadObjectStream(AsyncAbstractObjectStream): + def __init__( + self, + client, + bucket_name=None, + object_name=None, + generation_number=None, + # TODO: meta_generation + read_handle=None, # open with rea + ): + super().__init__( + bucket_name=bucket_name, + object_name=object_name, + generation_number=generation_number, + ) + self.client = client + self.bucket_name = bucket_name + self._full_bucket_name = f"projects/_/buckets/{bucket_name}" + self.object_name = object_name + self.generation_number = generation_number + self.read_handle = read_handle + + # can this interface be changed tmrw ? (not accounting for that) + # self.rpc = self.client.get_bidi_rpc_str_str_mc() # expose this func in GAPIC + self.rpc = self.client._client._transport._wrapped_methods[ + self.client._client._transport.bidi_read_object + ] + first_bidi_read_req = storage_v2.BidiReadObjectRequest( + read_object_spec=storage_v2.BidiReadObjectSpec( + bucket=self._full_bucket_name, object=object_name + ), + ) + self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),) + self.socket_like_rpc = AsyncBidiRpc( + self.rpc, initial_request=first_bidi_read_req, metadata=self.metadata + ) + + async def open(self) -> None: + """ + 1 send & 1 recv() + + """ + await self.socket_like_rpc.open() # this is actually 1 send + response = await self.socket_like_rpc.recv() + print(response) + # object_metadata = + if self.generation_number is None: + self.generation_number = response.metadata.generation + + self.read_handle = response.read_handle + + return + # return await super().open() + + async def close(self): + return await super().close() + + async def send(self, bidi_read_object_request): + await self.socket_like_rpc.send(bidi_read_object_request) + """ + 1. what if this fails ? + 2. calculate checksum and send data + A: you don't have to calcuate checksum here. since it's read da! DF + + """ + + return + + async def recv(self): + bidi_read_object_response = await self.socket_like_rpc.recv() + """ + P0 - get this working. + 1. what if this fails ? + what kind of error ? + existing retry wrapper ? from gapic + 2. data is already checksumm'ed , + you calcuated the checksum , verify and return. If verification fails raise. + + 3. traces ? + + 4. what if decompressive transcoding ? + + + """ + return bidi_read_object_response + + +async def test(bucket_name, object_name): + client = AsyncGrpcClient()._grpc_client + async_read_obj_str = AsyncReadObjectStream( + client, bucket_name=bucket_name, object_name=object_name + ) + await async_read_obj_str.open() + + # create bidi proto 'n' requests + for i in range(3): + req_count = 10 + read_range_count = 1 + + for req_num in range(req_count): + # create ranges + read_ranges: List[storage_v2.ReadRange] = [] + read_ids_set = set() + for read_id in range(read_range_count): + read_ids_set.add(read_id) + # read_length = 32 * 1024 * 1024 # up to 32 MiB + read_length = 10 + read_offset = random.randint(0, 210763776 - read_length) + # read_length = READ_LENGTH + # read_offset = random.randint(0, 10 * 1024 * 1024 - read_length) + read_range = storage_v2.ReadRange( + read_offset=read_offset, read_length=read_length, read_id=read_id + ) + read_ranges.append(read_range) + # first bidi req is already sent, so in 2nd request onwards, we send only + # read_ranges. + await async_read_obj_str.send( + storage_v2.BidiReadObjectRequest(read_ranges=read_ranges) + ) + + for i in range(20): + print("i", i) + try: + # response2 = await asyncio.wait_for(async_read_obj_str.recv(), timeout=2) + async with asyncio.timeout(2): + # response2 = await asyncio.wait_for( + # async_read_obj_str.recv(), timeout=2 + # ) + response2 = await async_read_obj_str.recv() + print(response2) + except asyncio.TimeoutError: + print("await4ed for 2s no response") + # print("opening again") + # await async_read_obj_str.open() + + break + + # pass + + +if __name__ == "__main__": + """ + 1. import argparse + 2. create parser + 3. add args + + 4. parse args + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--bucket_name", help="The name of the GCS bucket to upload to." + ) + parser.add_argument("--object_name", help="Object name") + args = parser.parse_args() + + asyncio.run(test(bucket_name=args.bucket_name, object_name=args.object_name)) + # test() diff --git a/google/cloud/storage/_experimental/bidi_async.py b/google/cloud/storage/_experimental/bidi_async.py new file mode 100644 index 000000000..f9b80599a --- /dev/null +++ b/google/cloud/storage/_experimental/bidi_async.py @@ -0,0 +1,230 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Asynchronous bi-directional streaming RPC helpers.""" + +import asyncio +import logging + +from google.api_core import exceptions +from bidi_base import BidiRpcBase + +_LOGGER = logging.getLogger(__name__) + + +class _AsyncRequestQueueGenerator: + """_AsyncRequestQueueGenerator is a helper class for sending asynchronous + requests to a gRPC stream from a Queue. + + This generator takes asynchronous requests off a given queue and yields them + to gRPC. + + This helper is useful when you have an indeterminate, indefinite, or + otherwise open-ended set of requests to send through a request-streaming + (or bidirectional) RPC. + + The reason this is necessary + + is because it's let's user have control on the when they would want to + send requests proto messages instead of sending all of them initilally. + + This is achieved via asynchronous queue (asyncio.Queue), + gRPC awaits until there's a message in the queue. + + Finally, it allows for retrying without swapping queues because if it does + pull an item off the queue when the RPC is inactive, it'll immediately put + it back and then exit. This is necessary because yielding the item in this + case will cause gRPC to discard it. In practice, this means that the order + of messages is not guaranteed. If such a thing is necessary it would be + easy to use a priority queue. + + Example:: + + requests = _AsyncRequestQueueGenerator(q) + call = await stub.StreamingRequest(requests) + requests.call = call + + async for response in call: + print(response) + await q.put(...) + + Args: + queue (asyncio.Queue): The request queue. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is done independently of the request queue to allow for + easily restarting streams that require some initial configuration + request. + """ + + def __init__(self, queue: asyncio.Queue, initial_request=None): + self._queue = queue + self._initial_request = initial_request + self.call = None + + def _is_active(self): + """ + Returns true if the call is not set or not completed. + """ + return self.call is None or not self.call.done() + + async def __aiter__(self): + if self._initial_request is not None: + if callable(self._initial_request): + yield self._initial_request() + else: + yield self._initial_request + + while True: + item = await self._queue.get() + + # The consumer explicitly sent "None", indicating that the request + # should end. + if item is None: + _LOGGER.debug("Cleanly exiting request generator.") + return + + if not self._is_active(): + # We have an item, but the call is closed. We should put the + # item back on the queue so that the next call can consume it. + await self._queue.put(item) + _LOGGER.debug( + "Inactive call, replacing item on queue and exiting " + "request generator." + ) + return + + yield item + + +class AsyncBidiRpc(BidiRpcBase): + """A helper for consuming a async bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + Example:: + + initial_request = example_pb2.StreamingRpcRequest( + setting='example') + rpc = AsyncBidiRpc( + stub.StreamingRpc, + initial_request=initial_request, + metadata=[('name', 'value')] + ) + + await rpc.open() + + while rpc.is_active: + print(await rpc.recv()) + await rpc.send(example_pb2.StreamingRpcRequest( + data='example')) + + This does *not* retry the stream on errors. See :class:`AsyncResumableBidiRpc`. + + Args: + start_rpc (grpc.aio.StreamStreamMultiCallable): The gRPC method used to + start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def _create_queue(self): + """Create a queue for requests.""" + return asyncio.Queue() + + async def open(self): + """Opens the stream.""" + if self.is_active: + raise ValueError("Can not open an already open stream.") + + request_generator = _AsyncRequestQueueGenerator( + self._request_queue, initial_request=self._initial_request + ) + try: + call = await self._start_rpc(request_generator, metadata=self._rpc_metadata) + except exceptions.GoogleAPICallError as exc: + # The original `grpc.RpcError` (which is usually also a `grpc.Call`) is + # available from the ``response`` property on the mapped exception. + self._on_call_done(exc.response) + raise + + request_generator.call = call + + # TODO: api_core should expose the future interface for wrapped + # callables as well. + if hasattr(call, "_wrapped"): # pragma: NO COVER + call._wrapped.add_done_callback(self._on_call_done) + else: + call.add_done_callback(self._on_call_done) + + self._request_generator = request_generator + self.call = call + + async def close(self): + """Closes the stream.""" + if self.call is None: + return + + await self._request_queue.put(None) + self.call.cancel() + self._request_generator = None + self._initial_request = None + self._callbacks = [] + # Don't set self.call to None. Keep it around so that send/recv can + # raise the error. + + async def send(self, request): + """Queue a message to be sent on the stream. + + If the underlying RPC has been closed, this will raise. + + Args: + request (protobuf.Message): The request to send. + """ + if self.call is None: + raise ValueError("Can not send() on an RPC that has never been opened.") + + # Don't use self.is_active(), as ResumableBidiRpc will overload it + # to mean something semantically different. + if not self.call.done(): + await self._request_queue.put(request) + else: + # calling read should cause the call to raise. + await self.call.read() + + async def recv(self): + """Wait for a message to be returned from the stream. + + If the underlying RPC has been closed, this will raise. + + Returns: + protobuf.Message: The received message. + """ + if self.call is None: + raise ValueError("Can not recv() on an RPC that has never been opened.") + + return await self.call.read() + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + return self.call is not None and not self.call.done() diff --git a/google/cloud/storage/_experimental/bidi_base.py b/google/cloud/storage/_experimental/bidi_base.py new file mode 100644 index 000000000..195e35750 --- /dev/null +++ b/google/cloud/storage/_experimental/bidi_base.py @@ -0,0 +1,80 @@ +# Copyright 2025, Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may obtain a copy of the License at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Base class for bi-directional streaming RPC helpers.""" + + +class BidiRpcBase: + """A base class for consuming a bi-directional streaming RPC. + + This maps gRPC's built-in interface which uses a request iterator and a + response iterator into a socket-like :func:`send` and :func:`recv`. This + is a more useful pattern for long-running or asymmetric streams (streams + where there is not a direct correlation between the requests and + responses). + + This does *not* retry the stream on errors. + + Args: + start_rpc (Union[grpc.StreamStreamMultiCallable, + grpc.aio.StreamStreamMultiCallable]): The gRPC method used + to start the RPC. + initial_request (Union[protobuf.Message, + Callable[[], protobuf.Message]]): The initial request to + yield. This is useful if an initial request is needed to start the + stream. + metadata (Sequence[Tuple(str, str)]): RPC metadata to include in + the request. + """ + + def __init__(self, start_rpc, initial_request=None, metadata=None): + self._start_rpc = start_rpc + self._initial_request = initial_request + self._rpc_metadata = metadata + self._request_queue = self._create_queue() + self._request_generator = None + self._callbacks = [] + self.call = None + + def _create_queue(self): + """Create a queue for requests.""" + raise NotImplementedError("`_create_queue` is not implemented.") + + def add_done_callback(self, callback): + """Adds a callback that will be called when the RPC terminates. + + This occurs when the RPC errors or is successfully terminated. + + Args: + callback (Callable[[grpc.Future], None]): The callback to execute. + It will be provided with the same gRPC future as the underlying + stream which will also be a :class:`grpc.aio.Call`. + """ + self._callbacks.append(callback) + + def _on_call_done(self, future): + # This occurs when the RPC errors or is successfully terminated. + # Note that grpc's "future" here can also be a grpc.RpcError. + # See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331 + # that `grpc.RpcError` is also `grpc.aio.Call`. + for callback in self._callbacks: + callback(future) + + @property + def is_active(self): + """bool: True if this stream is currently open and active.""" + raise NotImplementedError("`is_active` is not implemented.") + + @property + def pending_requests(self): + """int: Returns an estimate of the number of queued requests.""" + return self._request_queue.qsize() diff --git a/setup.py b/setup.py index 43e3404f6..d72c6a81c 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ release_status = "Development Status :: 5 - Production/Stable" dependencies = [ "google-auth >= 2.26.1, < 3.0.0", - "google-api-core >= 2.15.0, < 3.0.0", + "google-api-core[grpc] >= 2.15.0, < 3.0.0", "google-cloud-core >= 2.4.2, < 3.0.0", # The dependency "google-resumable-media" is no longer used. However, the # dependency is still included here to accommodate users who may be @@ -41,6 +41,7 @@ "google-resumable-media >= 2.7.2, < 3.0.0", "requests >= 2.22.0, < 3.0.0", "google-crc32c >= 1.1.3, < 2.0.0", + "grpc-google-iam-v1 >= 0.14.0, <1.0.0", ] extras = { "protobuf": ["protobuf >= 3.20.2, < 7.0.0"],