This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 65
Expand file tree
/
Copy pathdata_client_snippets_async.py
More file actions
318 lines (255 loc) · 13.7 KB
/
data_client_snippets_async.py
File metadata and controls
318 lines (255 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#!/usr/bin/env python
# Copyright 2024, Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
async def write_simple(table):
# [START bigtable_async_write_simple]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import SetCell
async def write_simple(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
family_id = "stats_summary"
row_key = b"phone#4c410523#20190501"
cell_mutation = SetCell(family_id, "connected_cell", 1)
wifi_mutation = SetCell(family_id, "connected_wifi", 1)
os_mutation = SetCell(family_id, "os_build", "PQ2A.190405.003")
await table.mutate_row(row_key, cell_mutation)
await table.mutate_row(row_key, wifi_mutation)
await table.mutate_row(row_key, os_mutation)
# [END bigtable_async_write_simple]
await write_simple(table.client.project, table.instance_id, table.table_id)
async def write_batch(table):
# [START bigtable_async_writes_batch]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data.mutations import SetCell
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
async def write_batch(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
family_id = "stats_summary"
try:
async with table.mutations_batcher() as batcher:
mutation_list = [
SetCell(family_id, "connected_cell", 1),
SetCell(family_id, "connected_wifi", 1),
SetCell(family_id, "os_build", "12155.0.0-rc1"),
]
# awaiting the batcher.append method adds the RowMutationEntry
# to the batcher's queue to be written in the next flush.
await batcher.append(
RowMutationEntry("tablet#a0b81f74#20190501", mutation_list)
)
await batcher.append(
RowMutationEntry("tablet#a0b81f74#20190502", mutation_list)
)
except MutationsExceptionGroup as e:
# MutationsExceptionGroup contains a FailedMutationEntryError for
# each mutation that failed.
for sub_exception in e.exceptions:
failed_entry: RowMutationEntry = sub_exception.entry
cause: Exception = sub_exception.__cause__
print(
f"Failed mutation: {failed_entry.row_key} with error: {cause!r}"
)
# [END bigtable_async_writes_batch]
await write_batch(table.client.project, table.instance_id, table.table_id)
async def write_increment(table):
# [START bigtable_async_write_increment]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule
async def write_increment(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
family_id = "stats_summary"
row_key = "phone#4c410523#20190501"
# Decrement the connected_wifi value by 1.
increment_rule = IncrementRule(
family_id, "connected_wifi", increment_amount=-1
)
result_row = await table.read_modify_write_row(row_key, increment_rule)
# check result
cell = result_row[0]
print(f"{cell.row_key} value: {int(cell)}")
# [END bigtable_async_write_increment]
await write_increment(table.client.project, table.instance_id, table.table_id)
async def write_conditional(table):
# [START bigtable_async_writes_conditional]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import row_filters
from google.cloud.bigtable.data import SetCell
async def write_conditional(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
family_id = "stats_summary"
row_key = "phone#4c410523#20190501"
row_filter = row_filters.RowFilterChain(
filters=[
row_filters.FamilyNameRegexFilter(family_id),
row_filters.ColumnQualifierRegexFilter("os_build"),
row_filters.ValueRegexFilter("PQ2A\\..*"),
]
)
if_true = SetCell(family_id, "os_name", "android")
result = await table.check_and_mutate_row(
row_key,
row_filter,
true_case_mutations=if_true,
false_case_mutations=None,
)
if result is True:
print("The row os_name was set to android")
# [END bigtable_async_writes_conditional]
await write_conditional(table.client.project, table.instance_id, table.table_id)
async def write_aggregate(table):
# [START bigtable_async_write_aggregate]
import time
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
async def write_aggregate(project_id, instance_id, table_id):
"""Increments a value in a Bigtable table using AddToCell mutation."""
async with BigtableDataClientAsync(project=project_id) as client:
table = client.get_table(instance_id, table_id)
row_key = "unique_device_ids_1"
try:
async with table.mutations_batcher() as batcher:
# The AddToCell mutation increments the value of a cell.
# The `counters` family must be set up to be an aggregate
# family with an int64 input type.
reading = AddToCell(
family="counters",
qualifier="odometer",
value=32304,
# Convert nanoseconds to microseconds
timestamp_micros=time.time_ns() // 1000,
)
await batcher.append(
RowMutationEntry(row_key.encode("utf-8"), [reading])
)
except MutationsExceptionGroup as e:
# MutationsExceptionGroup contains a FailedMutationEntryError for
# each mutation that failed.
for sub_exception in e.exceptions:
failed_entry: RowMutationEntry = sub_exception.entry
cause: Exception = sub_exception.__cause__
print(
f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}"
)
# [END bigtable_async_write_aggregate]
await write_aggregate(table.client.project, table.instance_id, table.table_id)
async def read_row(table):
# [START bigtable_async_reads_row]
from google.cloud.bigtable.data import BigtableDataClientAsync
async def read_row(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
row_key = "phone#4c410523#20190501"
row = await table.read_row(row_key)
print(row)
# [END bigtable_async_reads_row]
await read_row(table.client.project, table.instance_id, table.table_id)
async def read_row_partial(table):
# [START bigtable_async_reads_row_partial]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import row_filters
async def read_row_partial(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
row_key = "phone#4c410523#20190501"
col_filter = row_filters.ColumnQualifierRegexFilter(b"os_build")
row = await table.read_row(row_key, row_filter=col_filter)
print(row)
# [END bigtable_async_reads_row_partial]
await read_row_partial(table.client.project, table.instance_id, table.table_id)
async def read_rows_multiple(table):
# [START bigtable_async_reads_rows]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import ReadRowsQuery
async def read_rows(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
query = ReadRowsQuery(
row_keys=[b"phone#4c410523#20190501", b"phone#4c410523#20190502"]
)
async for row in await table.read_rows_stream(query):
print(row)
# [END bigtable_async_reads_rows]
await read_rows(table.client.project, table.instance_id, table.table_id)
async def read_row_range(table):
# [START bigtable_async_reads_row_range]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import ReadRowsQuery
from google.cloud.bigtable.data import RowRange
async def read_row_range(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
row_range = RowRange(
start_key=b"phone#4c410523#20190501",
end_key=b"phone#4c410523#201906201",
)
query = ReadRowsQuery(row_ranges=[row_range])
async for row in await table.read_rows_stream(query):
print(row)
# [END bigtable_async_reads_row_range]
await read_row_range(table.client.project, table.instance_id, table.table_id)
async def read_with_prefix(table):
# [START bigtable_async_reads_prefix]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import ReadRowsQuery
from google.cloud.bigtable.data import RowRange
async def read_prefix(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
prefix = "phone#"
end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1)
prefix_range = RowRange(start_key=prefix, end_key=end_key)
query = ReadRowsQuery(row_ranges=[prefix_range])
async for row in await table.read_rows_stream(query):
print(row)
# [END bigtable_async_reads_prefix]
await read_prefix(table.client.project, table.instance_id, table.table_id)
async def read_with_filter(table):
# [START bigtable_async_reads_filter]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data import ReadRowsQuery
from google.cloud.bigtable.data import row_filters
async def read_with_filter(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
async with client.get_table(instance_id, table_id) as table:
row_filter = row_filters.ValueRegexFilter(b"PQ2A.*$")
query = ReadRowsQuery(row_filter=row_filter)
async for row in await table.read_rows_stream(query):
print(row)
# [END bigtable_async_reads_filter]
await read_with_filter(table.client.project, table.instance_id, table.table_id)
async def execute_query(table):
# [START bigtable_async_execute_query]
from google.cloud.bigtable.data import BigtableDataClientAsync
async def execute_query(project_id, instance_id, table_id):
async with BigtableDataClientAsync(project=project_id) as client:
query = (
"SELECT _key, stats_summary['os_build'], "
"stats_summary['connected_cell'], "
"stats_summary['connected_wifi'] "
f"from `{table_id}` WHERE _key=@row_key"
)
result = await client.execute_query(
query,
instance_id,
parameters={"row_key": b"phone#4c410523#20190501"},
)
results = [r async for r in result]
print(results)
# [END bigtable_async_execute_query]
await execute_query(table.client.project, table.instance_id, table.table_id)