Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions src/ipc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void IpcServer::on_new_connection(uv_stream_t* server_stream, int status)
IpcServer* server = static_cast<IpcServer*>(server_stream->data);
server->reset_idle_timer();

auto client = std::make_unique<ClientConnection>();
auto client = std::make_shared<ClientConnection>();
client->server = server;

int r = uv_pipe_init(&server->_loop, &client->handle, 0);
Expand All @@ -149,11 +149,13 @@ void IpcServer::on_new_connection(uv_stream_t* server_stream, int status)
}
client->handle.data = client.get();

// Track client before any uv_close call so on_close always finds a valid shared_ptr
server->_clients[&client->handle] = client;

r = uv_accept(server_stream, reinterpret_cast<uv_stream_t*>(&client->handle));
if (r != 0) {
LOG("Failed to accept connection: " + std::string(uv_strerror(r)));
uv_close(reinterpret_cast<uv_handle_t*>(&client->handle), on_close);
client.release(); // Will be deleted in on_close
return;
}

Expand All @@ -168,8 +170,6 @@ void IpcServer::on_new_connection(uv_stream_t* server_stream, int status)
LOG("Failed to start reading: " + std::string(uv_strerror(r)));
uv_close(reinterpret_cast<uv_handle_t*>(&client->handle), on_close);
}

client.release(); // Ownership transferred to libuv callbacks
}

void IpcServer::alloc_buffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
Expand Down Expand Up @@ -231,21 +231,26 @@ void IpcServer::process_client_data(ClientConnection& client)
offset += key_len;

switch (request_type) {
case REQ_GET:
case REQ_GET: {
LOG("GET request for key " + hex_key);
_storage_client.get(hex_key, [&](StorageResponse&& response) {
auto client_ptr = client.shared_from_this();
_storage_client.get(hex_key, [this, client_ptr](StorageResponse&& response) {
if (client_ptr->disconnected) {
return;
}
if (response.result == StorageResult::OK) {
std::vector<uint8_t> header;
header.reserve(9);
header.push_back(STATUS_OK);
write_u64_host_byte_order(header, response.data.size());
send_response(client, std::move(header));
send_response(client, std::move(response.data));
send_response(*client_ptr, std::move(header));
send_response(*client_ptr, std::move(response.data));
} else {
send_simple_response(client, "GET", response);
send_simple_response(*client_ptr, "GET", response);
}
});
break;
}

case REQ_PUT: {
if (len < offset + 1) {
Expand All @@ -267,19 +272,28 @@ void IpcServer::process_client_data(ClientConnection& client)
bool overwrite = (flags & PUT_FLAG_OVERWRITE) != 0;
LOG("PUT request for key " + hex_key + " (" + std::to_string(value.size()) + " bytes)");

_storage_client.put(hex_key, std::move(value), overwrite, [&](StorageResponse&& response) {
send_simple_response(client, "PUT", response);
auto client_ptr = client.shared_from_this();
_storage_client.put(hex_key, std::move(value), overwrite, [this, client_ptr](StorageResponse&& response) {
if (client_ptr->disconnected) {
return;
}
send_simple_response(*client_ptr, "PUT", response);
});
break;
}

case REQ_REMOVE:
case REQ_REMOVE: {
LOG("REMOVE request for key " + hex_key);
_storage_client.remove(hex_key, [&](StorageResponse&& response) {
send_simple_response(client, "REMOVE", response);
auto client_ptr = client.shared_from_this();
_storage_client.remove(hex_key, [this, client_ptr](StorageResponse&& response) {
if (client_ptr->disconnected) {
return;
}
send_simple_response(*client_ptr, "REMOVE", response);
});
break;
}
}

buf.erase(buf.begin(), buf.begin() + offset);
}
Expand Down Expand Up @@ -351,5 +365,9 @@ void IpcServer::on_write_complete(uv_write_t* req, int status)
void IpcServer::on_close(uv_handle_t* handle)
{
LOG("Client disconnected");
delete static_cast<ClientConnection*>(handle->data);
auto* client = static_cast<ClientConnection*>(handle->data);
client->disconnected = true;
// Remove from _clients map; shared_ptr prevent premature deletion
// if callbacks are still pending
client->server->_clients.erase(reinterpret_cast<uv_pipe_t*>(handle));
}
5 changes: 4 additions & 1 deletion src/ipc_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
#include <cstdint>
#include <functional>
#include <memory>
#include <unordered_map>
#include <vector>

class IpcServer;

struct ClientConnection
struct ClientConnection : public std::enable_shared_from_this<ClientConnection>
{
uv_pipe_t handle;
IpcServer* server;
std::vector<uint8_t> read_buf;
std::vector<char> alloc_buf; // Reusable buffer for libuv reads
bool writing = false;
bool disconnected = false; // Set when client disconnects, prevents sending to closed pipe
std::vector<std::vector<uint8_t>> write_queue;
};

Expand Down Expand Up @@ -59,4 +61,5 @@ class IpcServer
StorageClient& _storage_client;
uv_pipe_t _server_pipe;
uv_timer_t _idle_timer;
std::unordered_map<uv_pipe_t*, std::shared_ptr<ClientConnection>> _clients;
};
Loading