This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 65
Expand file tree
/
Copy path_mutate_rows.py
More file actions
237 lines (219 loc) · 10 KB
/
_mutate_rows.py
File metadata and controls
237 lines (219 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
from typing import Sequence, TYPE_CHECKING
from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._metrics import tracked_retry
# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
from google.cloud.bigtable.data.mutations import _EntryWithProto
from google.cloud.bigtable.data._cross_sync import CrossSync
if TYPE_CHECKING:
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._metrics import ActiveOperationMetric
if CrossSync.is_async:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
BigtableAsyncClient as GapicClientType,
)
from google.cloud.bigtable.data._async.client import ( # type: ignore
_DataApiTargetAsync as TargetType,
)
else:
from google.cloud.bigtable_v2.services.bigtable.client import ( # type: ignore
BigtableClient as GapicClientType,
)
from google.cloud.bigtable.data._sync_autogen.client import ( # type: ignore
_DataApiTarget as TargetType,
)
__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen._mutate_rows"
@CrossSync.convert_class("_MutateRowsOperation")
class _MutateRowsOperationAsync:
"""
MutateRowsOperation manages the logic of sending a set of row mutations,
and retrying on failed entries. It manages this using the _run_attempt
function, which attempts to mutate all outstanding entries, and raises
_MutateRowsIncomplete if any retryable errors are encountered.
Errors are exposed as a MutationsExceptionGroup, which contains a list of
exceptions organized by the related failed mutation entries.
Args:
gapic_client: the client to use for the mutate_rows call
target: the table or view associated with the request
mutation_entries: a list of RowMutationEntry objects to send to the server
operation_timeout: the timeout to use for the entire operation, in seconds.
attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
metric: the metric object representing the active operation
retryable_exceptions: a list of exceptions that should be retried
"""
@CrossSync.convert
def __init__(
self,
gapic_client: GapicClientType,
target: TargetType,
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
metric: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
if total_mutations > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
raise ValueError(
"mutate_rows requests can contain at most "
f"{_MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across "
f"all entries. Found {total_mutations}."
)
self._target = target
self._gapic_fn = gapic_client.mutate_rows
# create predicate for determining which errors are retryable
self.is_retryable = retries.if_exception_type(
# RPC level errors
*retryable_exceptions,
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
self._operation = lambda: tracked_retry(
retry_fn=CrossSync.retry_target,
operation=metric,
target=self._run_attempt,
predicate=self.is_retryable,
timeout=operation_timeout,
)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
attempt_timeout, operation_timeout
)
self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries]
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}
# set up metrics
self._operation_metric = metric
@CrossSync.convert
async def start(self):
"""
Start the operation, and run until completion
Raises:
MutationsExceptionGroup: if any mutations failed
"""
with self._operation_metric:
try:
# trigger mutate_rows
await self._operation()
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
for idx in incomplete_indices:
self._handle_entry_error(idx, exc)
finally:
# raise exception detailing incomplete mutations
all_errors: list[Exception] = []
for idx, exc_list in self.errors.items():
if len(exc_list) == 0:
raise core_exceptions.ClientError(
f"Mutation {idx} failed with no associated errors"
)
elif len(exc_list) == 1:
cause_exc = exc_list[0]
else:
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
entry = self.mutations[idx].entry
all_errors.append(
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
all_errors, len(self.mutations)
)
@CrossSync.convert
async def _run_attempt(self):
"""
Run a single attempt of the mutate_rows rpc.
Raises:
_MutateRowsIncomplete: if there are failed mutations eligible for
retry after the attempt is complete
GoogleAPICallError: if the gapic rpc fails
"""
# register attempt start
self._operation_metric.start_attempt()
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
# track mutations in this request that have not been finalized yet
active_request_indices = {
req_idx: orig_idx for req_idx, orig_idx in enumerate(self.remaining_indices)
}
self.remaining_indices = []
if not request_entries:
# no more mutations. return early
return
# make gapic request
try:
result_generator = await self._gapic_fn(
request=types_pb.MutateRowsRequest(
entries=request_entries,
app_profile_id=self._target.app_profile_id,
**self._target._request_path,
),
timeout=next(self.timeout_generator),
retry=None,
)
async for result_list in result_generator:
for result in result_list.entries:
# convert sub-request index to global index
orig_idx = active_request_indices[result.index]
entry_error = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
elif orig_idx in self.errors:
# mutation succeeded; remove from error list
del self.errors[orig_idx]
# remove processed entry from active list
del active_request_indices[result.index]
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
for idx in active_request_indices.values():
self._handle_entry_error(idx, exc)
# bubble up exception to be handled by retry wrapper
raise
# check if attempt succeeded, or needs to be retried
if self.remaining_indices:
# unfinished work; raise exception to trigger retry
raise bt_exceptions._MutateRowsIncomplete
def _handle_entry_error(self, idx: int, exc: Exception):
"""
Add an exception to the list of exceptions for a given mutation index,
and add the index to the list of remaining indices if the exception is
retryable.
Args:
idx: the index of the mutation that failed
exc: the exception to add to the list
"""
entry = self.mutations[idx].entry
self.errors.setdefault(idx, []).append(exc)
if (
entry.is_idempotent()
and self.is_retryable(exc)
and idx not in self.remaining_indices
):
self.remaining_indices.append(idx)