diff --git a/src/ipc_server.cpp b/src/ipc_server.cpp index ea6c854..198e6fc 100644 --- a/src/ipc_server.cpp +++ b/src/ipc_server.cpp @@ -139,7 +139,7 @@ void IpcServer::on_new_connection(uv_stream_t* server_stream, int status) IpcServer* server = static_cast(server_stream->data); server->reset_idle_timer(); - auto client = std::make_unique(); + auto client = std::make_shared(); client->server = server; int r = uv_pipe_init(&server->_loop, &client->handle, 0); @@ -149,11 +149,13 @@ void IpcServer::on_new_connection(uv_stream_t* server_stream, int status) } client->handle.data = client.get(); + // Track client before any uv_close call so on_close always finds a valid shared_ptr + server->_clients[&client->handle] = client; + r = uv_accept(server_stream, reinterpret_cast(&client->handle)); if (r != 0) { LOG("Failed to accept connection: " + std::string(uv_strerror(r))); uv_close(reinterpret_cast(&client->handle), on_close); - client.release(); // Will be deleted in on_close return; } @@ -168,8 +170,6 @@ void IpcServer::on_new_connection(uv_stream_t* server_stream, int status) LOG("Failed to start reading: " + std::string(uv_strerror(r))); uv_close(reinterpret_cast(&client->handle), on_close); } - - client.release(); // Ownership transferred to libuv callbacks } void IpcServer::alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) @@ -231,21 +231,26 @@ void IpcServer::process_client_data(ClientConnection& client) offset += key_len; switch (request_type) { - case REQ_GET: + case REQ_GET: { LOG("GET request for key " + hex_key); - _storage_client.get(hex_key, [&](StorageResponse&& response) { + auto client_ptr = client.shared_from_this(); + _storage_client.get(hex_key, [this, client_ptr](StorageResponse&& response) { + if (client_ptr->disconnected) { + return; + } if (response.result == StorageResult::OK) { std::vector header; header.reserve(9); header.push_back(STATUS_OK); write_u64_host_byte_order(header, response.data.size()); - send_response(client, std::move(header)); - send_response(client, std::move(response.data)); + send_response(*client_ptr, std::move(header)); + send_response(*client_ptr, std::move(response.data)); } else { - send_simple_response(client, "GET", response); + send_simple_response(*client_ptr, "GET", response); } }); break; + } case REQ_PUT: { if (len < offset + 1) { @@ -267,19 +272,28 @@ void IpcServer::process_client_data(ClientConnection& client) bool overwrite = (flags & PUT_FLAG_OVERWRITE) != 0; LOG("PUT request for key " + hex_key + " (" + std::to_string(value.size()) + " bytes)"); - _storage_client.put(hex_key, std::move(value), overwrite, [&](StorageResponse&& response) { - send_simple_response(client, "PUT", response); + auto client_ptr = client.shared_from_this(); + _storage_client.put(hex_key, std::move(value), overwrite, [this, client_ptr](StorageResponse&& response) { + if (client_ptr->disconnected) { + return; + } + send_simple_response(*client_ptr, "PUT", response); }); break; } - case REQ_REMOVE: + case REQ_REMOVE: { LOG("REMOVE request for key " + hex_key); - _storage_client.remove(hex_key, [&](StorageResponse&& response) { - send_simple_response(client, "REMOVE", response); + auto client_ptr = client.shared_from_this(); + _storage_client.remove(hex_key, [this, client_ptr](StorageResponse&& response) { + if (client_ptr->disconnected) { + return; + } + send_simple_response(*client_ptr, "REMOVE", response); }); break; } + } buf.erase(buf.begin(), buf.begin() + offset); } @@ -351,5 +365,9 @@ void IpcServer::on_write_complete(uv_write_t* req, int status) void IpcServer::on_close(uv_handle_t* handle) { LOG("Client disconnected"); - delete static_cast(handle->data); + auto* client = static_cast(handle->data); + client->disconnected = true; + // Remove from _clients map; shared_ptr prevent premature deletion + // if callbacks are still pending + client->server->_clients.erase(reinterpret_cast(handle)); } diff --git a/src/ipc_server.hpp b/src/ipc_server.hpp index c901c2a..11639ed 100644 --- a/src/ipc_server.hpp +++ b/src/ipc_server.hpp @@ -11,17 +11,19 @@ #include #include #include +#include #include class IpcServer; -struct ClientConnection +struct ClientConnection : public std::enable_shared_from_this { uv_pipe_t handle; IpcServer* server; std::vector read_buf; std::vector alloc_buf; // Reusable buffer for libuv reads bool writing = false; + bool disconnected = false; // Set when client disconnects, prevents sending to closed pipe std::vector> write_queue; }; @@ -59,4 +61,5 @@ class IpcServer StorageClient& _storage_client; uv_pipe_t _server_pipe; uv_timer_t _idle_timer; + std::unordered_map> _clients; };