Skip to content

Commit f0bb606

Browse files
authored
feat(storage): Extend idempotency token use to all async mutation operations (#16100)
1 parent cf931ff commit f0bb606

5 files changed

Lines changed: 102 additions & 14 deletions

File tree

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ future<StatusOr<google::storage::v2::Object>> AsyncConnectionImpl::InsertObject(
169169

170170
auto hash_function = CreateHashFunction(*options);
171171
ApplyRoutingHeaders(*context, request.write_object_spec());
172-
context->AddMetadata("x-goog-gcs-idempotency-token", id);
172+
AddIdempotencyToken(*context, id);
173173
auto rpc = stub->AsyncWriteObject(cq, std::move(context), options);
174174
rpc = std::make_unique<StreamingRpcTimeout>(cq, timeout, timeout,
175175
std::move(rpc));
@@ -336,13 +336,14 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
336336
// NOLINTNEXTLINE(bugprone-lambda-function-name)
337337
backoff = std::move(backoff), current, function_name = __func__,
338338
// Use shared_ptr to propagate RoutingHeaderOptions across retries.
339-
current_routing_options = std::make_shared<RoutingHeaderOptions>()](
339+
current_routing_options = std::make_shared<RoutingHeaderOptions>(),
340+
id = invocation_id_generator_.MakeInvocationId()](
340341
google::storage::v2::BidiWriteObjectRequest req) {
341-
auto call = [stub, request = std::move(req), current_routing_options](
342-
CompletionQueue& cq,
343-
std::shared_ptr<grpc::ClientContext> context,
344-
google::cloud::internal::ImmutableOptions options,
345-
RequestPlaceholder const&) mutable
342+
auto call = [stub, request = std::move(req), current_routing_options,
343+
id](CompletionQueue& cq,
344+
std::shared_ptr<grpc::ClientContext> context,
345+
google::cloud::internal::ImmutableOptions options,
346+
RequestPlaceholder const&) mutable
346347
-> future<StatusOr<WriteObject::WriteResult>> {
347348
auto start_timeout = ScaleStallTimeout(
348349
options->get<storage::TransferStallTimeoutOption>(),
@@ -359,6 +360,8 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
359360
ApplyRoutingHeaders(*context, request.append_object_spec(),
360361
*current_routing_options);
361362

363+
AddIdempotencyToken(*context, id);
364+
362365
auto rpc = stub->AsyncBidiWriteObject(cq, std::move(context),
363366
std::move(options));
364367
rpc = std::make_unique<StreamingRpcTimeout>(
@@ -507,11 +510,12 @@ AsyncConnectionImpl::ComposeObject(ComposeObjectParams p) {
507510
auto current = internal::MakeImmutableOptions(std::move(p.options));
508511
auto const idempotency =
509512
idempotency_policy(*current)->ComposeObject(p.request);
510-
auto call = [stub = stub_](
513+
auto call = [stub = stub_, id = invocation_id_generator_.MakeInvocationId()](
511514
CompletionQueue& cq,
512515
std::shared_ptr<grpc::ClientContext> context,
513516
google::cloud::internal::ImmutableOptions options,
514517
google::storage::v2::ComposeObjectRequest const& request) {
518+
AddIdempotencyToken(*context, id);
515519
return stub->AsyncComposeObject(cq, std::move(context), std::move(options),
516520
request);
517521
};
@@ -530,10 +534,11 @@ future<Status> AsyncConnectionImpl::DeleteObject(DeleteObjectParams p) {
530534
auto backoff = backoff_policy(*current);
531535
return google::cloud::internal::AsyncRetryLoop(
532536
std::move(retry), std::move(backoff), idempotency, cq_,
533-
[stub = stub_](CompletionQueue& cq,
534-
std::shared_ptr<grpc::ClientContext> context,
535-
google::cloud::internal::ImmutableOptions options,
536-
google::storage::v2::DeleteObjectRequest const& proto) {
537+
[stub = stub_, id = invocation_id_generator_.MakeInvocationId()](
538+
CompletionQueue& cq, std::shared_ptr<grpc::ClientContext> context,
539+
google::cloud::internal::ImmutableOptions options,
540+
google::storage::v2::DeleteObjectRequest const& proto) {
541+
AddIdempotencyToken(*context, id);
537542
return stub->AsyncDeleteObject(cq, std::move(context),
538543
std::move(options), proto);
539544
},

google/cloud/storage/internal/async/connection_impl_test.cc

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "google/cloud/storage/internal/async/connection_impl.h"
1616
#include "google/cloud/storage/async/idempotency_policy.h"
1717
#include "google/cloud/storage/async/retry_policy.h"
18+
#include "google/cloud/storage/async/writer_connection.h"
1819
#include "google/cloud/storage/internal/async/default_options.h"
1920
#include "google/cloud/storage/internal/async/write_payload_impl.h"
2021
#include "google/cloud/storage/options.h"
@@ -105,9 +106,14 @@ TEST_F(AsyncConnectionImplTest, ComposeObject) {
105106
return StatusOr<google::storage::v2::Object>(TransientError());
106107
});
107108
})
108-
.WillOnce([&](CompletionQueue&, auto,
109+
.WillOnce([&](CompletionQueue&,
110+
std::shared_ptr<grpc::ClientContext> const& context,
109111
google::cloud::internal::ImmutableOptions const& options,
110112
google::storage::v2::ComposeObjectRequest const& request) {
113+
EXPECT_THAT(
114+
GetMetadata(*context),
115+
testing::Contains(testing::Pair("x-goog-gcs-idempotency-token",
116+
testing::Not(testing::IsEmpty()))));
111117
// Verify at least one option is initialized with the correct value.
112118
EXPECT_EQ(options->get<AuthorityOption>(), kAuthority);
113119
auto expected = google::storage::v2::ComposeObjectRequest{};
@@ -205,9 +211,14 @@ TEST_F(AsyncConnectionImplTest, DeleteObject) {
205211
return TransientError();
206212
});
207213
})
208-
.WillOnce([&](CompletionQueue&, auto,
214+
.WillOnce([&](CompletionQueue&,
215+
std::shared_ptr<grpc::ClientContext> const& context,
209216
google::cloud::internal::ImmutableOptions const& options,
210217
google::storage::v2::DeleteObjectRequest const& request) {
218+
EXPECT_THAT(
219+
GetMetadata(*context),
220+
testing::Contains(testing::Pair("x-goog-gcs-idempotency-token",
221+
testing::Not(testing::IsEmpty()))));
211222
// Verify at least one option is initialized with the correct values.
212223
EXPECT_EQ(options->get<AuthorityOption>(), kAuthority);
213224
google::storage::v2::DeleteObjectRequest expected;
@@ -344,6 +355,63 @@ TEST_F(AsyncConnectionImplTest, RewriteObject) {
344355
EXPECT_THAT(r1.get(), IsOkAndHolds(match_progress(1000, 3000)));
345356
}
346357

358+
TEST_F(AsyncConnectionImplTest, AppendableObjectUploadToken) {
359+
auto constexpr kRequestText = R"pb(
360+
write_object_spec {
361+
resource {
362+
bucket: "projects/_/buckets/test-bucket"
363+
name: "test-object"
364+
content_type: "text/plain"
365+
}
366+
}
367+
)pb";
368+
AsyncSequencer<bool> sequencer;
369+
auto mock = std::make_shared<storage::testing::MockStorageStub>();
370+
371+
EXPECT_CALL(*mock, AsyncBidiWriteObject)
372+
.WillOnce([&](CompletionQueue const&,
373+
std::shared_ptr<grpc::ClientContext> const& context,
374+
internal::ImmutableOptions const&) {
375+
EXPECT_THAT(
376+
GetMetadata(*context),
377+
testing::Contains(testing::Pair("x-goog-gcs-idempotency-token",
378+
testing::Not(testing::IsEmpty()))));
379+
380+
auto stream = std::make_unique<::google::cloud::storage::testing::
381+
MockAsyncBidiWriteObjectStream>();
382+
EXPECT_CALL(*stream, Start).WillOnce([&] {
383+
return sequencer.PushBack("Start");
384+
});
385+
EXPECT_CALL(*stream, Finish).WillOnce([&] {
386+
return sequencer.PushBack("Finish").then(
387+
[](auto) { return Status(StatusCode::kCancelled, "cancelled"); });
388+
});
389+
using AsyncBidiWriteObjectStream =
390+
::google::cloud::AsyncStreamingReadWriteRpc<
391+
google::storage::v2::BidiWriteObjectRequest,
392+
google::storage::v2::BidiWriteObjectResponse>;
393+
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
394+
});
395+
396+
internal::AutomaticallyCreatedBackgroundThreads pool(1);
397+
auto connection = MakeTestConnection(pool.cq(), mock);
398+
399+
auto request = google::storage::v2::BidiWriteObjectRequest{};
400+
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
401+
auto pending = connection->StartAppendableObjectUpload(
402+
{std::move(request), connection->options()});
403+
404+
auto next = sequencer.PopFrontWithName();
405+
EXPECT_EQ(next.second, "Start");
406+
next.first.set_value(false); // Fail to start
407+
408+
next = sequencer.PopFrontWithName();
409+
EXPECT_EQ(next.second, "Finish");
410+
next.first.set_value(true);
411+
412+
EXPECT_THAT(pending.get(), StatusIs(StatusCode::kCancelled));
413+
}
414+
347415
} // namespace
348416
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
349417
} // namespace storage_internal

google/cloud/storage/internal/grpc/configure_client_context.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ void AddIdempotencyToken(grpc::ClientContext& ctx,
4747
}
4848
}
4949

50+
void AddIdempotencyToken(grpc::ClientContext& ctx, std::string const& token) {
51+
ctx.AddMetadata(kIdempotencyTokenHeader, token);
52+
}
53+
5054
void ApplyRoutingHeaders(
5155
grpc::ClientContext& context,
5256
storage::internal::InsertObjectMediaRequest const& request) {

google/cloud/storage/internal/grpc/configure_client_context.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ struct RoutingHeaderOptions {
3737
void AddIdempotencyToken(grpc::ClientContext& ctx,
3838
rest_internal::RestContext const& context);
3939

40+
/// Configures @p ctx using @p token.
41+
void AddIdempotencyToken(grpc::ClientContext& ctx, std::string const& token);
42+
4043
/**
4144
* Inject request query parameters into grpc::ClientContext.
4245
*

google/cloud/storage/internal/grpc/configure_client_context_test.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ TEST_F(GrpcConfigureClientContext, AddIdempotencyToken) {
5555
Contains(Pair("x-goog-gcs-idempotency-token", "token-123")));
5656
}
5757

58+
TEST_F(GrpcConfigureClientContext, AddIdempotencyTokenString) {
59+
grpc::ClientContext ctx;
60+
AddIdempotencyToken(ctx, "token-123");
61+
auto metadata = GetMetadata(ctx);
62+
EXPECT_THAT(metadata,
63+
Contains(Pair("x-goog-gcs-idempotency-token", "token-123")));
64+
}
65+
5866
TEST_F(GrpcConfigureClientContext, ApplyQueryParametersEmpty) {
5967
grpc::ClientContext ctx;
6068
ApplyQueryParameters(

0 commit comments

Comments
 (0)