Skip to content

Commit 5208f05

Browse files
committed
fix(storage): Resolve potential race condition in AsyncWriterConnectionImpl
1 parent fc9f8f0 commit 5208f05

2 files changed

Lines changed: 21 additions & 3 deletions

File tree

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,16 @@ AsyncWriterConnectionImpl::~AsyncWriterConnectionImpl() {
9494
// (2) calls to `Write()`, `Finalize()`, and `Query()` must have completed
9595
// by the time the destructor is called
9696
Finish();
97+
98+
// We use a local copy of `impl` moved under lock to avoid
99+
// data races with concurrent calls to `Finish()` from callbacks.
100+
std::unique_lock<std::mutex> lk(mu_);
101+
auto impl = std::move(impl_);
102+
lk.unlock();
103+
97104
// When `impl_->Finish()` is satisfied then `finished_` is satisfied too.
98105
// This extends the lifetime of `impl_` until it is safe to delete.
99-
finished_.then([impl = std::move(impl_)](auto) mutable {
106+
finished_.then([impl = std::move(impl)](auto) mutable {
100107
// Break the ownership cycle between the completion queue and this callback.
101108
impl.reset();
102109
});
@@ -213,7 +220,11 @@ AsyncWriterConnectionImpl::OnFinalUpload(std::size_t upload_size,
213220
.then(transform);
214221
}
215222
offset_ += upload_size;
216-
return impl_->Read()
223+
224+
std::unique_lock<std::mutex> lk(mu_);
225+
auto impl = impl_;
226+
lk.unlock();
227+
return impl->Read()
217228
.then([this](auto f) { return OnQuery(f.get()); })
218229
.then([this](auto g) -> StatusOr<google::storage::v2::Object> {
219230
auto status = g.get();
@@ -258,11 +269,15 @@ future<StatusOr<std::int64_t>> AsyncWriterConnectionImpl::OnQuery(
258269
}
259270

260271
future<Status> AsyncWriterConnectionImpl::Finish() {
272+
std::unique_lock<std::mutex> lk(mu_);
261273
if (std::exchange(finish_called_, true)) {
262274
return make_ready_future(
263275
internal::CancelledError("already finished", GCP_ERROR_INFO()));
264276
}
265-
return impl_->Finish().then([p = std::move(on_finish_)](auto f) mutable {
277+
auto impl = impl_;
278+
lk.unlock();
279+
280+
return impl->Finish().then([p = std::move(on_finish_)](auto f) mutable {
266281
p.set_value();
267282
return f.get();
268283
});

google/cloud/storage/internal/async/writer_connection_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <atomic>
2525
#include <cstdint>
2626
#include <memory>
27+
#include <mutex>
2728

2829
namespace google {
2930
namespace cloud {
@@ -111,6 +112,8 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection {
111112

112113
// Track the latest write handle seen in responses.
113114
absl::optional<google::storage::v2::BidiWriteHandle> latest_write_handle_;
115+
116+
std::mutex mu_;
114117
};
115118

116119
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

0 commit comments

Comments
 (0)