diff --git a/source/http_connection.c b/source/http_connection.c index 6a000fe40..0b5facf99 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -7,6 +7,7 @@ #include "io.h" #include +#include #include #include #include @@ -16,20 +17,22 @@ static const char *s_capsule_name_http_connection = "aws_http_connection"; /** * Lifetime notes: - * - If connect() reports immediate failure, binding can be destroyed. - * - If on_connection_setup reports failure, binding can be destroyed. - * - Otherwise, binding cannot be destroyed until BOTH release() has been called AND on_connection_shutdown has fired. + * - Binding starts with ref_count=1 (owned by whoever will destroy on failure, or by the capsule on success). + * - If on_connection_setup succeeds, acquire another refcount. The additional ref is for the connection + * thread to invoke callbacks (e.g. s_on_connection_shutdown) with a valid binding. The shutdown callback + * is the last callback invoked by the connection thread, so it releases this ref. + * - The last release (ref_count 1→0) calls s_connection_destroy(). */ struct http_connection_binding { struct aws_http_connection *native; /* Reference to python object that reference to other related python object to keep it alive */ PyObject *py_core; - bool release_called; - bool shutdown_called; + struct aws_ref_count ref_count; }; -static void s_connection_destroy(struct http_connection_binding *connection) { +static void s_connection_destroy(void *user_data) { + struct http_connection_binding *connection = user_data; Py_XDECREF(connection->py_core); aws_mem_release(aws_py_get_allocator(), connection); @@ -40,38 +43,23 @@ struct aws_http_connection *aws_py_get_http_connection(PyObject *connection) { connection, s_capsule_name_http_connection, "HttpConnectionBase", http_connection_binding); } -static void s_connection_release(struct http_connection_binding *connection) { - AWS_FATAL_ASSERT(!connection->release_called); - connection->release_called = true; - - bool destroy_after_release = connection->shutdown_called; +static void s_connection_capsule_destructor(PyObject *capsule) { + struct http_connection_binding *connection = PyCapsule_GetPointer(capsule, s_capsule_name_http_connection); aws_http_connection_release(connection->native); - if (destroy_after_release) { - s_connection_destroy(connection); - } -} - -static void s_connection_capsule_destructor(PyObject *capsule) { - struct http_connection_binding *connection = PyCapsule_GetPointer(capsule, s_capsule_name_http_connection); - s_connection_release(connection); + aws_ref_count_release(&connection->ref_count); } static void s_on_connection_shutdown(struct aws_http_connection *native_connection, int error_code, void *user_data) { (void)native_connection; struct http_connection_binding *connection = user_data; - AWS_FATAL_ASSERT(!connection->shutdown_called); PyGILState_STATE state; if (aws_py_gilstate_ensure(&state)) { return; /* Python has shut down. Nothing matters anymore, but don't crash */ } - connection->shutdown_called = true; - - bool destroy_after_shutdown = connection->release_called; - /* Invoke on_shutdown, then clear our reference to it */ PyObject *result = PyObject_CallMethod(connection->py_core, "_on_shutdown", "(i)", error_code); @@ -82,9 +70,8 @@ static void s_on_connection_shutdown(struct aws_http_connection *native_connecti PyErr_WriteUnraisable(PyErr_Occurred()); } - if (destroy_after_shutdown) { - s_connection_destroy(connection); - } + /* This is the last callback invoked by the connection thread. Release the connection-thread ref. */ + aws_ref_count_release(&connection->ref_count); PyGILState_Release(state); } @@ -107,6 +94,10 @@ static void s_on_client_connection_setup( /* If setup was successful, encapsulate binding so we can pass it to python */ PyObject *capsule = NULL; if (!error_code) { + /* Acquire ref for the connection thread to invoke callbacks with a valid binding. + * The shutdown callback is the last one invoked, and will release this ref. */ + aws_ref_count_acquire(&connection->ref_count); + capsule = PyCapsule_New(connection, s_capsule_name_http_connection, s_connection_capsule_destructor); if (!capsule) { error_code = AWS_ERROR_UNKNOWN; @@ -125,13 +116,18 @@ static void s_on_client_connection_setup( } if (native_connection) { - /* Connection exists, but failed to create capsule. Release connection, which eventually destroys binding */ if (!capsule) { - s_connection_release(connection); + /* Native connection exists but capsule creation failed. We won't use the connection as a capsule, + * but the connection thread is still running. Release the initial ref (no capsule to own it), + * then release the native connection to initiate shutdown. The shutdown callback will release + * the connection-thread ref. */ + aws_ref_count_release(&connection->ref_count); + aws_http_connection_release(connection->native); } } else { - /* Connection failed its setup, destroy binding now */ - s_connection_destroy(connection); + /* Native connection failed to create. No capsule, no connection thread running. + * Release the sole ref to destroy the binding. */ + aws_ref_count_release(&connection->ref_count); } Py_XDECREF(capsule); @@ -281,6 +277,7 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { } struct http_connection_binding *connection = aws_mem_calloc(allocator, 1, sizeof(struct http_connection_binding)); + aws_ref_count_init(&connection->ref_count, connection, s_connection_destroy); /* From hereon, we need to clean up if errors occur */ struct aws_http2_setting *http2_settings = NULL; size_t http2_settings_count = 0; @@ -388,7 +385,7 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) { aws_mem_release(allocator, http2_settings); } if (!success) { - s_connection_destroy(connection); + aws_ref_count_release(&connection->ref_count); return NULL; } Py_RETURN_NONE; diff --git a/test/test_http_connection_lifetime.py b/test/test_http_connection_lifetime.py new file mode 100644 index 000000000..badbb70b8 --- /dev/null +++ b/test/test_http_connection_lifetime.py @@ -0,0 +1,118 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import gc +import threading +import unittest +from test import NativeResourceTest +from http.server import HTTPServer, SimpleHTTPRequestHandler +from awscrt.io import ClientBootstrap, DefaultHostResolver, EventLoopGroup +from awscrt.http import HttpClientConnection, HttpRequest +import awscrt.exceptions + + +class SilentHandler(SimpleHTTPRequestHandler): + def log_message(self, format, *args): + pass + + def do_GET(self): + self.send_response(200, 'OK') + self.send_header('Content-Length', '5') + self.end_headers() + self.wfile.write(b'hello') + + +class TestConnectionLifetime(NativeResourceTest): + """Tests for http_connection_binding ref-count based lifetime management. + + Under free-threaded Python (Py_GIL_DISABLED), the capsule destructor + (application thread) and on_connection_shutdown (event-loop thread) can + race. These tests exercise both orderings and a stress scenario. + """ + hostname = 'localhost' + timeout = 10 + + def setUp(self): + super().setUp() + self.server = HTTPServer((self.hostname, 0), SilentHandler) + self.port = self.server.server_address[1] + self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True) + self.server_thread.start() + + def tearDown(self): + self.server.shutdown() + self.server.server_close() + self.server_thread.join() + super().tearDown() + + def _new_connection(self): + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + future = HttpClientConnection.new( + host_name=self.hostname, + port=self.port, + bootstrap=bootstrap) + return future.result(self.timeout) + + def test_release_before_shutdown(self): + """Capsule destructor fires first, then shutdown callback.""" + connection = self._new_connection() + shutdown_future = connection.shutdown_future + + del connection + gc.collect() + + shutdown_future.result(self.timeout) + + def test_shutdown_before_release(self): + """Shutdown callback fires first (via close), then capsule destructor.""" + connection = self._new_connection() + shutdown_future = connection.shutdown_future + + connection.close() + shutdown_future.result(self.timeout) + + del connection + gc.collect() + + def test_concurrent_release_and_shutdown_stress(self): + """Stress: race capsule destructor against shutdown from many threads. + + Under Py_GIL_DISABLED, the old two-bool approach would double-free. + With atomic ref-counting, exactly one path destroys the binding. + """ + iterations = 50 + errors = [] + + def release_connection(conn): + try: + del conn + gc.collect() + except Exception as e: + errors.append(e) + + for i in range(iterations): + try: + connection = self._new_connection() + except awscrt.exceptions.AwsCrtError as e: + if e.name == 'AWS_IO_SOCKET_CONNECTION_REFUSED': + continue + raise + + shutdown_future = connection.shutdown_future + + connection.close() + + t = threading.Thread(target=release_connection, args=(connection,)) + del connection + t.start() + + shutdown_future.result(self.timeout) + t.join(self.timeout) + + self.assertEqual([], errors) + + +if __name__ == '__main__': + unittest.main()