Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions packages/gapic-generator/tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import os
import pytest
import pytest_asyncio
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
Comment on lines +21 to +22

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If the suggestion to use cert_verify in HostNameIgnoringAdapter is applied, PoolManager is no longer needed and its import can be removed.

Suggested change
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
from requests.adapters import HTTPAdapter


from typing import Sequence, Tuple

Expand Down Expand Up @@ -328,7 +330,7 @@ def _read_response_metadata_stream(self):
def intercept_unary_unary(self, continuation, client_call_details, request):
self._add_request_metadata(client_call_details)
response = continuation(client_call_details, request)
metadata = [(k, str(v)) for k, v in response.trailing_metadata()]
metadata = [(k, str(v)) for k, v in response.initial_metadata()] + [(k, str(v)) for k, v in response.trailing_metadata()]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In gRPC Python, initial_metadata() and trailing_metadata() can return None in certain circumstances (e.g., if the call failed or in mock/test environments). To prevent a TypeError when iterating or concatenating, add defensive None checks.

Suggested change
metadata = [(k, str(v)) for k, v in response.initial_metadata()] + [(k, str(v)) for k, v in response.trailing_metadata()]
initial_metadata = response.initial_metadata() or []
trailing_metadata = response.trailing_metadata() or []
metadata = [(k, str(v)) for k, v in initial_metadata] + [(k, str(v)) for k, v in trailing_metadata]
References
  1. Specifically enforce defensive programming: for languages that support nullable references (e.g., Go, Python, Java), ensure appropriate null/nil/None checks or other language-idiomatic guards exist before object property accesses.

self.response_metadata = metadata
return response

Expand Down Expand Up @@ -453,36 +455,64 @@ async def intercepted_echo_grpc_async():
return EchoAsyncClient(transport=transport), interceptor


class HostNameIgnoringAdapter(HTTPAdapter):
"""Custom HTTPAdapter that disables hostname verification for local self-signed certs."""
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
assert_hostname=False,
**pool_kwargs
)
Comment on lines +458 to +467

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In urllib3 v2.0+, the assert_hostname parameter has been removed from PoolManager, which will cause a TypeError at runtime. Instead of overriding init_poolmanager, you can override cert_verify on the HTTPAdapter to disable hostname verification in a way that is compatible with both urllib3 1.x and 2.x.

class HostNameIgnoringAdapter(HTTPAdapter):
    """Custom HTTPAdapter that disables hostname verification for local self-signed certs."""
    def cert_verify(self, conn, url, verify, cert):
        super().cert_verify(conn, url, verify, cert)
        conn.assert_hostname = False



@pytest.fixture
def intercepted_echo_rest():
def intercepted_echo_rest(use_mtls):
transport_name = "rest"
transport_cls = EchoClient.get_transport_class(transport_name)
interceptor = EchoMetadataClientRestInterceptor()

# The custom host explicitly bypasses https.
url_scheme = "https" if use_mtls else "http"
transport = transport_cls(
credentials=ga_credentials.AnonymousCredentials(),
host="localhost:7469",
url_scheme="http",
url_scheme=url_scheme,
interceptor=interceptor,
)
if use_mtls:
dir = os.path.dirname(__file__)
cert_path = os.path.join(dir, "../cert/mtls.crt")
key_path = os.path.join(dir, "../cert/mtls.key")
Comment on lines +484 to +486

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid using dir as a variable name because it shadows the Python built-in dir() function. Rename it to something more descriptive like current_dir or base_dir.

Suggested change
dir = os.path.dirname(__file__)
cert_path = os.path.join(dir, "../cert/mtls.crt")
key_path = os.path.join(dir, "../cert/mtls.key")
current_dir = os.path.dirname(__file__)
cert_path = os.path.join(current_dir, "../cert/mtls.crt")
key_path = os.path.join(current_dir, "../cert/mtls.key")

transport._session.verify = cert_path
transport._session.cert = (cert_path, key_path)
transport._session.mount("https://", HostNameIgnoringAdapter())

return EchoClient(transport=transport), interceptor


@pytest.fixture
def intercepted_echo_rest_async():
def intercepted_echo_rest_async(use_mtls):
if not HAS_ASYNC_REST_ECHO_TRANSPORT:
pytest.skip("Skipping test with async rest.")

transport_name = "rest_asyncio"
transport_cls = EchoAsyncClient.get_transport_class(transport_name)
interceptor = EchoMetadataClientRestAsyncInterceptor()

# The custom host explicitly bypasses https.
url_scheme = "https" if use_mtls else "http"
transport = transport_cls(
credentials=async_anonymous_credentials(),
host="localhost:7469",
url_scheme="http",
url_scheme=url_scheme,
interceptor=interceptor,
)
if use_mtls:
dir = os.path.dirname(__file__)
cert_path = os.path.join(dir, "../cert/mtls.crt")
key_path = os.path.join(dir, "../cert/mtls.key")
Comment on lines +511 to +513

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid using dir as a variable name because it shadows the Python built-in dir() function. Rename it to something more descriptive like current_dir or base_dir.

Suggested change
dir = os.path.dirname(__file__)
cert_path = os.path.join(dir, "../cert/mtls.crt")
key_path = os.path.join(dir, "../cert/mtls.key")
current_dir = os.path.dirname(__file__)
cert_path = os.path.join(current_dir, "../cert/mtls.crt")
key_path = os.path.join(current_dir, "../cert/mtls.key")

transport._session.verify = cert_path
transport._session.cert = (cert_path, key_path)
transport._session.mount("https://", HostNameIgnoringAdapter())

return EchoAsyncClient(transport=transport), interceptor
52 changes: 52 additions & 0 deletions packages/gapic-generator/tests/system/test_pqc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from google import showcase

@pytest.fixture
def run_pqc_test(use_mtls):
if not use_mtls:
pytest.skip("PQC integration test requires mTLS (--mtls flag) to be enabled.")

@pytest.mark.parametrize(
"transport_fixture",
["intercepted_echo_grpc", "intercepted_echo_rest"]
)
def test_pqc_negotiated_group(run_pqc_test, request, transport_fixture):
"""Verifies that the generated client library negotiates PQC with the Showcase server."""
client, interceptor = request.getfixturevalue(transport_fixture)

# Make secure call using the standard client library fixture
response = client.echo(request=showcase.EchoRequest(content="Verify PQC connection."))
assert response.content == "Verify PQC connection."

# Extract negotiated group and supported groups from response headers
negotiated_group = None
supported_groups = None
for key, value in interceptor.response_metadata:
if key.lower() == "x-showcase-tls-group":
negotiated_group = value
elif key.lower() == "x-showcase-tls-client-supported-groups":
supported_groups = value
Comment on lines +38 to +42

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Accessing interceptor.response_metadata directly can raise an AttributeError or TypeError if the attribute is not set or is None (e.g., if the call failed or if the interceptor does not support it). Use getattr with a default value to safely access it.

Suggested change
for key, value in interceptor.response_metadata:
if key.lower() == "x-showcase-tls-group":
negotiated_group = value
elif key.lower() == "x-showcase-tls-client-supported-groups":
supported_groups = value
response_metadata = getattr(interceptor, "response_metadata", None) or []
for key, value in response_metadata:
if key.lower() == "x-showcase-tls-group":
negotiated_group = value
elif key.lower() == "x-showcase-tls-client-supported-groups":
supported_groups = value
References
  1. Specifically enforce defensive programming: for languages that support nullable references (e.g., Go, Python, Java), ensure appropriate null/nil/None checks or other language-idiomatic guards exist before object property accesses.


assert negotiated_group is not None, "Failed: Showcase server did not return negotiated TLS group header."
assert supported_groups is not None, "Failed: Showcase server did not return client advertised supported groups."

print(f"\n[PQC Verification] ({transport_fixture}) Negotiated TLS Group: {negotiated_group}")
print(f"[PQC Verification] ({transport_fixture}) Client Advertised Supported Groups: {supported_groups}")

# Enforce PQC compliance (this will fail if not using MLKEM/Kyber)
assert "MLKEM" in negotiated_group or "Kyber" in negotiated_group, \
f"Failed: {transport_fixture} Connection is NOT PQC-compliant! Negotiated: {negotiated_group}"
Loading