Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 29 additions & 32 deletions source/http_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "io.h"

#include <aws/common/array_list.h>
#include <aws/common/ref_count.h>
#include <aws/http/connection.h>
#include <aws/http/proxy.h>
#include <aws/http/request_response.h>
Expand All @@ -16,20 +17,22 @@ static const char *s_capsule_name_http_connection = "aws_http_connection";

/**
* Lifetime notes:
* - If connect() reports immediate failure, binding can be destroyed.
* - If on_connection_setup reports failure, binding can be destroyed.
* - Otherwise, binding cannot be destroyed until BOTH release() has been called AND on_connection_shutdown has fired.
* - Binding starts with ref_count=1 (owned by whoever will destroy on failure, or by the capsule on success).
* - If on_connection_setup succeeds, acquire another refcount. The additional ref is for the connection
* thread to invoke callbacks (e.g. s_on_connection_shutdown) with a valid binding. The shutdown callback
* is the last callback invoked by the connection thread, so it releases this ref.
* - The last release (ref_count 1→0) calls s_connection_destroy().
*/
struct http_connection_binding {
struct aws_http_connection *native;
/* Reference to python object that reference to other related python object to keep it alive */
PyObject *py_core;

bool release_called;
bool shutdown_called;
struct aws_ref_count ref_count;
};

static void s_connection_destroy(struct http_connection_binding *connection) {
static void s_connection_destroy(void *user_data) {
struct http_connection_binding *connection = user_data;
Py_XDECREF(connection->py_core);

aws_mem_release(aws_py_get_allocator(), connection);
Expand All @@ -40,38 +43,23 @@ struct aws_http_connection *aws_py_get_http_connection(PyObject *connection) {
connection, s_capsule_name_http_connection, "HttpConnectionBase", http_connection_binding);
}

static void s_connection_release(struct http_connection_binding *connection) {
AWS_FATAL_ASSERT(!connection->release_called);
connection->release_called = true;

bool destroy_after_release = connection->shutdown_called;
static void s_connection_capsule_destructor(PyObject *capsule) {
struct http_connection_binding *connection = PyCapsule_GetPointer(capsule, s_capsule_name_http_connection);

aws_http_connection_release(connection->native);

if (destroy_after_release) {
s_connection_destroy(connection);
}
}

static void s_connection_capsule_destructor(PyObject *capsule) {
struct http_connection_binding *connection = PyCapsule_GetPointer(capsule, s_capsule_name_http_connection);
s_connection_release(connection);
aws_ref_count_release(&connection->ref_count);
}

static void s_on_connection_shutdown(struct aws_http_connection *native_connection, int error_code, void *user_data) {
(void)native_connection;
struct http_connection_binding *connection = user_data;
AWS_FATAL_ASSERT(!connection->shutdown_called);

PyGILState_STATE state;
if (aws_py_gilstate_ensure(&state)) {
return; /* Python has shut down. Nothing matters anymore, but don't crash */
}

connection->shutdown_called = true;

bool destroy_after_shutdown = connection->release_called;

/* Invoke on_shutdown, then clear our reference to it */
PyObject *result = PyObject_CallMethod(connection->py_core, "_on_shutdown", "(i)", error_code);

Expand All @@ -82,9 +70,8 @@ static void s_on_connection_shutdown(struct aws_http_connection *native_connecti
PyErr_WriteUnraisable(PyErr_Occurred());
}

if (destroy_after_shutdown) {
s_connection_destroy(connection);
}
/* This is the last callback invoked by the connection thread. Release the connection-thread ref. */
aws_ref_count_release(&connection->ref_count);

PyGILState_Release(state);
}
Expand All @@ -107,6 +94,10 @@ static void s_on_client_connection_setup(
/* If setup was successful, encapsulate binding so we can pass it to python */
PyObject *capsule = NULL;
if (!error_code) {
/* Acquire ref for the connection thread to invoke callbacks with a valid binding.
* The shutdown callback is the last one invoked, and will release this ref. */
aws_ref_count_acquire(&connection->ref_count);

capsule = PyCapsule_New(connection, s_capsule_name_http_connection, s_connection_capsule_destructor);
if (!capsule) {
error_code = AWS_ERROR_UNKNOWN;
Expand All @@ -125,13 +116,18 @@ static void s_on_client_connection_setup(
}

if (native_connection) {
/* Connection exists, but failed to create capsule. Release connection, which eventually destroys binding */
if (!capsule) {
s_connection_release(connection);
/* Native connection exists but capsule creation failed. We won't use the connection as a capsule,
* but the connection thread is still running. Release the initial ref (no capsule to own it),
* then release the native connection to initiate shutdown. The shutdown callback will release
* the connection-thread ref. */
aws_ref_count_release(&connection->ref_count);
aws_http_connection_release(connection->native);
}
} else {
/* Connection failed its setup, destroy binding now */
s_connection_destroy(connection);
/* Native connection failed to create. No capsule, no connection thread running.
* Release the sole ref to destroy the binding. */
aws_ref_count_release(&connection->ref_count);
}

Py_XDECREF(capsule);
Expand Down Expand Up @@ -281,6 +277,7 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) {
}

struct http_connection_binding *connection = aws_mem_calloc(allocator, 1, sizeof(struct http_connection_binding));
aws_ref_count_init(&connection->ref_count, connection, s_connection_destroy);
/* From hereon, we need to clean up if errors occur */
struct aws_http2_setting *http2_settings = NULL;
size_t http2_settings_count = 0;
Expand Down Expand Up @@ -388,7 +385,7 @@ PyObject *aws_py_http_client_connection_new(PyObject *self, PyObject *args) {
aws_mem_release(allocator, http2_settings);
}
if (!success) {
s_connection_destroy(connection);
aws_ref_count_release(&connection->ref_count);
return NULL;
}
Py_RETURN_NONE;
Expand Down
118 changes: 118 additions & 0 deletions test/test_http_connection_lifetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.

import gc
import threading
import unittest
from test import NativeResourceTest
from http.server import HTTPServer, SimpleHTTPRequestHandler
from awscrt.io import ClientBootstrap, DefaultHostResolver, EventLoopGroup
from awscrt.http import HttpClientConnection, HttpRequest
import awscrt.exceptions


class SilentHandler(SimpleHTTPRequestHandler):
def log_message(self, format, *args):
pass

def do_GET(self):
self.send_response(200, 'OK')
self.send_header('Content-Length', '5')
self.end_headers()
self.wfile.write(b'hello')


class TestConnectionLifetime(NativeResourceTest):
"""Tests for http_connection_binding ref-count based lifetime management.

Under free-threaded Python (Py_GIL_DISABLED), the capsule destructor
(application thread) and on_connection_shutdown (event-loop thread) can
race. These tests exercise both orderings and a stress scenario.
"""
hostname = 'localhost'
timeout = 10

def setUp(self):
super().setUp()
self.server = HTTPServer((self.hostname, 0), SilentHandler)
self.port = self.server.server_address[1]
self.server_thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.server_thread.start()

def tearDown(self):
self.server.shutdown()
self.server.server_close()
self.server_thread.join()
super().tearDown()

def _new_connection(self):
event_loop_group = EventLoopGroup()
host_resolver = DefaultHostResolver(event_loop_group)
bootstrap = ClientBootstrap(event_loop_group, host_resolver)
future = HttpClientConnection.new(
host_name=self.hostname,
port=self.port,
bootstrap=bootstrap)
return future.result(self.timeout)

def test_release_before_shutdown(self):
"""Capsule destructor fires first, then shutdown callback."""
connection = self._new_connection()
shutdown_future = connection.shutdown_future

del connection
gc.collect()

shutdown_future.result(self.timeout)

def test_shutdown_before_release(self):
"""Shutdown callback fires first (via close), then capsule destructor."""
connection = self._new_connection()
shutdown_future = connection.shutdown_future

connection.close()
shutdown_future.result(self.timeout)

del connection
gc.collect()

def test_concurrent_release_and_shutdown_stress(self):
"""Stress: race capsule destructor against shutdown from many threads.

Under Py_GIL_DISABLED, the old two-bool approach would double-free.
With atomic ref-counting, exactly one path destroys the binding.
"""
iterations = 50
errors = []

def release_connection(conn):
try:
del conn
gc.collect()
except Exception as e:
errors.append(e)

for i in range(iterations):
try:
connection = self._new_connection()
except awscrt.exceptions.AwsCrtError as e:
if e.name == 'AWS_IO_SOCKET_CONNECTION_REFUSED':
continue
raise

shutdown_future = connection.shutdown_future

connection.close()

t = threading.Thread(target=release_connection, args=(connection,))
del connection
t.start()

shutdown_future.result(self.timeout)
t.join(self.timeout)

self.assertEqual([], errors)


if __name__ == '__main__':
unittest.main()
Loading