From 7c3ed3b7e49da52c4a26fb7935346f72daa3dae8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 10 Apr 2026 11:30:34 +0800 Subject: [PATCH 1/2] Add AutoClusterFailover and ServiceInfoProvider --- pulsar/__init__.py | 204 ++++++++++++++++++++++++++++++++++++++++++- src/client.cc | 98 +++++++++++++++++++-- src/config.cc | 43 ++++++--- tests/pulsar_test.py | 74 ++++++++++++++++ 4 files changed, 398 insertions(+), 21 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 0f60552d..afcb6340 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -569,6 +569,159 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str _check_type(str, method, 'method') self.auth = _pulsar.AuthenticationBasic.create(username, password, method) + +class ServiceInfoProvider: + """ + Base class for Python-defined service discovery and failover providers. + + Subclasses must return the initial :class:`ServiceInfo` and may keep the + provided update callback to push later service changes into the client. + """ + + def initial_service_info(self) -> "ServiceInfo": + raise NotImplementedError + + def initialize(self, on_service_info_update: Callable[["ServiceInfo"], None]) -> None: + raise NotImplementedError + + def close(self) -> None: + """ + Stop background work and release resources. + + This is invoked when the underlying C++ client destroys the provider, + typically during :meth:`Client.close`. + """ + return None + + +class ServiceInfo: + """ + Connection information for one Pulsar cluster endpoint. + + This is primarily used with :class:`AutoClusterFailover`. + """ + + def __init__(self, + service_url: str, + authentication: Optional[Authentication] = None, + tls_trust_certs_file_path: Optional[str] = None): + """ + Create a service info entry. + + Parameters + ---------- + service_url: str + The Pulsar service URL for this cluster. + authentication: Authentication, optional + Authentication to use when connecting to this cluster. + tls_trust_certs_file_path: str, optional + Trust store path for TLS connections to this cluster. + """ + _check_type(str, service_url, 'service_url') + _check_type_or_none(Authentication, authentication, 'authentication') + _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') + + self._authentication = authentication + self._service_info = _pulsar.ServiceInfo( + service_url, + authentication.auth if authentication else None, + tls_trust_certs_file_path, + ) + + @property + def service_url(self) -> str: + return self._service_info.service_url + + @property + def use_tls(self) -> bool: + return self._service_info.use_tls + + @property + def tls_trust_certs_file_path(self) -> Optional[str]: + return self._service_info.tls_trust_certs_file_path + + def __repr__(self) -> str: + return ( + "ServiceInfo(" + f"service_url={self.service_url!r}, " + f"use_tls={self.use_tls!r}, " + f"tls_trust_certs_file_path={self.tls_trust_certs_file_path!r})" + ) + + @classmethod + def wrap(cls, service_info: _pulsar.ServiceInfo): + self = cls.__new__(cls) + self._authentication = None + self._service_info = service_info + return self + + +class AutoClusterFailover: + """ + Cluster-level automatic failover configuration for :class:`Client`. + """ + + def __init__(self, + primary: ServiceInfo, + secondary: List[ServiceInfo], + check_interval_ms: int = 5000, + failover_threshold: int = 1, + switch_back_threshold: int = 1): + """ + Create an automatic failover configuration. + + Parameters + ---------- + primary: ServiceInfo + The preferred cluster to use. + secondary: list[ServiceInfo] + Ordered fallback clusters to probe when the primary becomes unavailable. + check_interval_ms: int, default=5000 + Probe interval in milliseconds. + failover_threshold: int, default=1 + Number of consecutive probe failures required before failover. + switch_back_threshold: int, default=1 + Number of consecutive successful primary probes required before switching back. + """ + _check_type(ServiceInfo, primary, 'primary') + _check_type(list, secondary, 'secondary') + _check_type(int, check_interval_ms, 'check_interval_ms') + _check_type(int, failover_threshold, 'failover_threshold') + _check_type(int, switch_back_threshold, 'switch_back_threshold') + + if not secondary: + raise ValueError("Argument secondary is expected to contain at least one ServiceInfo") + + for index, service_info in enumerate(secondary): + if not isinstance(service_info, ServiceInfo): + raise ValueError( + "Argument secondary[%d] is expected to be of type 'ServiceInfo' and not '%s'" + % (index, type(service_info).__name__) + ) + + if check_interval_ms <= 0: + raise ValueError("Argument check_interval_ms is expected to be greater than 0") + if failover_threshold <= 0: + raise ValueError("Argument failover_threshold is expected to be greater than 0") + if switch_back_threshold <= 0: + raise ValueError("Argument switch_back_threshold is expected to be greater than 0") + + self.primary = primary + self.secondary = list(secondary) + self.check_interval_ms = check_interval_ms + self.failover_threshold = failover_threshold + self.switch_back_threshold = switch_back_threshold + + def __repr__(self) -> str: + return ( + "AutoClusterFailover(" + f"primary={self.primary!r}, " + f"secondary={self.secondary!r}, " + f"check_interval_ms={self.check_interval_ms!r}, " + f"failover_threshold={self.failover_threshold!r}, " + f"switch_back_threshold={self.switch_back_threshold!r})" + ) + class ConsumerDeadLetterPolicy: """ Configuration for the "dead letter queue" feature in consumer. @@ -681,8 +834,9 @@ def __init__(self, service_url, Parameters ---------- - service_url: str - The Pulsar service url eg: pulsar://my-broker.com:6650/ + service_url: str or AutoClusterFailover or ServiceInfoProvider + The Pulsar service URL, for example ``pulsar://my-broker.com:6650/``, or an + :class:`AutoClusterFailover` or :class:`ServiceInfoProvider` configuration. authentication: Authentication, optional Set the authentication provider to be used with the broker. Supported methods: @@ -743,7 +897,26 @@ def __init__(self, service_url, tls_certificate_file_path: str, optional The path to the TLS certificate file. """ - _check_type(str, service_url, 'service_url') + if not isinstance(service_url, (str, AutoClusterFailover, ServiceInfoProvider)): + raise ValueError( + "Argument service_url is expected to be of type 'str', 'AutoClusterFailover' or " + "'ServiceInfoProvider'" + ) + + if isinstance(service_url, (AutoClusterFailover, ServiceInfoProvider)) and authentication is not None: + raise ValueError( + "Argument authentication is not supported when service_url is an AutoClusterFailover or " + "ServiceInfoProvider; set authentication on each ServiceInfo instead" + ) + + if isinstance(service_url, (AutoClusterFailover, ServiceInfoProvider)) and \ + tls_trust_certs_file_path is not None: + raise ValueError( + "Argument tls_trust_certs_file_path is not supported when service_url is an " + "AutoClusterFailover or ServiceInfoProvider; set tls_trust_certs_file_path on each " + "ServiceInfo instead" + ) + _check_type_or_none(Authentication, authentication, 'authentication') _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds') _check_type(int, connection_timeout_ms, 'connection_timeout_ms') @@ -792,7 +965,24 @@ def __init__(self, service_url, conf.tls_private_key_file_path(tls_private_key_file_path) if tls_certificate_file_path is not None: conf.tls_certificate_file_path(tls_certificate_file_path) - self._client = _pulsar.Client(service_url, conf) + if isinstance(service_url, AutoClusterFailover): + self._client = _pulsar.Client.create_auto_cluster_failover( + service_url.primary._service_info, + [service_info._service_info for service_info in service_url.secondary], + service_url.check_interval_ms, + service_url.failover_threshold, + service_url.switch_back_threshold, + conf, + ) + elif isinstance(service_url, ServiceInfoProvider): + try: + self._client = _pulsar.Client.create_service_info_provider(service_url, conf) + except RuntimeError as e: + if str(e) == "Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance": + raise ValueError(str(e)) + raise + else: + self._client = _pulsar.Client(service_url, conf) self._consumers = [] @staticmethod @@ -1417,6 +1607,12 @@ def get_topic_partitions(self, topic): _check_type(str, topic, 'topic') return self._client.get_topic_partitions(topic) + def get_service_info(self) -> ServiceInfo: + """ + Get the current service info used by this client. + """ + return ServiceInfo.wrap(self._client.get_service_info()) + def shutdown(self): """ Perform immediate shutdown of Pulsar client. diff --git a/src/client.cc b/src/client.cc index d77938fc..64a8e7bb 100644 --- a/src/client.cc +++ b/src/client.cc @@ -18,12 +18,70 @@ */ #include "utils.h" +#include +#include +#include +#include #include #include #include namespace py = pybind11; +static ServiceInfo unwrapPythonServiceInfo(const py::handle& object) { + auto serviceInfoObject = py::reinterpret_borrow(object); + + try { + return serviceInfoObject.cast(); + } catch (const py::cast_error&) { + } + + if (py::hasattr(serviceInfoObject, "_service_info")) { + try { + return serviceInfoObject.attr("_service_info").cast(); + } catch (const py::cast_error&) { + } + } + + throw py::value_error("Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance"); +} + +class PythonServiceInfoProvider : public ServiceInfoProvider { + public: + explicit PythonServiceInfoProvider(py::object provider) : provider_(std::move(provider)) {} + + ~PythonServiceInfoProvider() override { + if (!Py_IsInitialized()) { + return; + } + + py::gil_scoped_acquire acquire; + try { + if (py::hasattr(provider_, "close")) { + provider_.attr("close")(); + } + } catch (const py::error_already_set&) { + PyErr_Print(); + } + } + + ServiceInfo initialServiceInfo() override { + py::gil_scoped_acquire acquire; + return unwrapPythonServiceInfo(provider_.attr("initial_service_info")()); + } + + void initialize(std::function onServiceInfoUpdate) override { + py::gil_scoped_acquire acquire; + provider_.attr("initialize")(py::cpp_function( + [onServiceInfoUpdate = std::move(onServiceInfoUpdate)](py::object serviceInfo) mutable { + onServiceInfoUpdate(unwrapPythonServiceInfo(serviceInfo)); + })); + } + + private: + py::object provider_; +}; + Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) { return waitForAsyncValue( [&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); }); @@ -65,7 +123,8 @@ std::vector Client_getTopicPartitions(Client& client, const std::st [&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); }); } -void Client_getTopicPartitionsAsync(Client &client, const std::string& topic, GetPartitionsCallback callback) { +void Client_getTopicPartitionsAsync(Client& client, const std::string& topic, + GetPartitionsCallback callback) { py::gil_scoped_release release; client.getPartitionsForTopicAsync(topic, callback); } @@ -76,6 +135,25 @@ SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, int64_ }); } +std::shared_ptr Client_createAutoClusterFailover(ServiceInfo primary, + std::vector secondary, + int64_t checkIntervalMs, uint32_t failoverThreshold, + uint32_t switchBackThreshold, + const ClientConfiguration& conf) { + AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary), std::move(secondary)); + autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs); + autoClusterFailoverConfig.failoverThreshold = failoverThreshold; + autoClusterFailoverConfig.switchBackThreshold = switchBackThreshold; + return std::make_shared( + Client::create(std::make_unique(std::move(autoClusterFailoverConfig)), conf)); +} + +std::shared_ptr Client_createServiceInfoProvider(py::object provider, + const ClientConfiguration& conf) { + return std::make_shared( + Client::create(std::make_unique(std::move(provider)), conf)); +} + void Client_close(Client& client) { waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); }); } @@ -108,19 +186,25 @@ void Client_subscribeAsync_pattern(Client& client, const std::string& topic_patt void export_client(py::module_& m) { py::class_>(m, "Client") .def(py::init()) + .def_static("create_auto_cluster_failover", &Client_createAutoClusterFailover, py::arg("primary"), + py::arg("secondary"), py::arg("check_interval_ms"), py::arg("failover_threshold"), + py::arg("switch_back_threshold"), py::arg("client_configuration")) + .def_static("create_service_info_provider", &Client_createServiceInfoProvider, py::arg("provider"), + py::arg("client_configuration")) .def("create_producer", &Client_createProducer) .def("create_producer_async", &Client_createProducerAsync) .def("subscribe", &Client_subscribe) .def("subscribe_topics", &Client_subscribe_topics) .def("subscribe_pattern", &Client_subscribe_pattern) .def("create_reader", &Client_createReader) - .def("create_table_view", [](Client& client, const std::string& topic, - const TableViewConfiguration& config) { - return waitForAsyncValue([&](TableViewCallback callback) { - client.createTableViewAsync(topic, config, callback); - }); - }) + .def("create_table_view", + [](Client& client, const std::string& topic, const TableViewConfiguration& config) { + return waitForAsyncValue([&](TableViewCallback callback) { + client.createTableViewAsync(topic, config, callback); + }); + }) .def("get_topic_partitions", &Client_getTopicPartitions) + .def("get_service_info", &Client::getServiceInfo) .def("get_schema_info", &Client_getSchemaInfo) .def("close", &Client_close) .def("close_async", &Client_closeAsync) diff --git a/src/config.cc b/src/config.cc index 4c5557d7..ec16b525 100644 --- a/src/config.cc +++ b/src/config.cc @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -135,16 +136,36 @@ static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguratio return conf; } +static ServiceInfo ServiceInfo_init(const std::string& serviceUrl, AuthenticationPtr authentication, + std::optional tlsTrustCertsFilePath) { + return ServiceInfo(serviceUrl, authentication ? std::move(authentication) : AuthFactory::Disabled(), + std::move(tlsTrustCertsFilePath)); +} + void export_config(py::module_& m) { using namespace py; + class_(m, "ServiceInfo") + .def(init(&ServiceInfo_init), arg("service_url"), arg("authentication") = nullptr, + arg("tls_trust_certs_file_path") = py::none()) + .def_property_readonly("service_url", + [](const ServiceInfo& serviceInfo) { return serviceInfo.serviceUrl(); }) + .def_property_readonly("use_tls", &ServiceInfo::useTls) + .def_property_readonly("tls_trust_certs_file_path", [](const ServiceInfo& serviceInfo) { + return serviceInfo.tlsTrustCertsFilePath(); + }); + class_>(m, "KeySharedPolicy") .def(init<>()) .def("set_key_shared_mode", &KeySharedPolicy::setKeySharedMode, return_value_policy::reference) .def("get_key_shared_mode", &KeySharedPolicy::getKeySharedMode) - .def("set_allow_out_of_order_delivery", &KeySharedPolicy::setAllowOutOfOrderDelivery, return_value_policy::reference) + .def("set_allow_out_of_order_delivery", &KeySharedPolicy::setAllowOutOfOrderDelivery, + return_value_policy::reference) .def("is_allow_out_of_order_delivery", &KeySharedPolicy::isAllowOutOfOrderDelivery) - .def("set_sticky_ranges", static_cast(&KeySharedPolicy::setStickyRanges), return_value_policy::reference) + .def("set_sticky_ranges", + static_cast( + &KeySharedPolicy::setStickyRanges), + return_value_policy::reference) .def("get_sticky_ranges", &KeySharedPolicy::getStickyRanges); class_>(m, "AbstractCryptoKeyReader") @@ -266,7 +287,8 @@ void export_config(py::module_& m) { .def(init<>()) .def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, return_value_policy::reference) .def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, return_value_policy::reference) - .def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, return_value_policy::reference) + .def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, + return_value_policy::reference) .def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference) .def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference); @@ -305,7 +327,8 @@ void export_config(py::module_& m) { .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) .def("regex_subscription_mode", &ConsumerConfiguration::setRegexSubscriptionMode) - .def("regex_subscription_mode", &ConsumerConfiguration::getRegexSubscriptionMode, return_value_policy::reference) + .def("regex_subscription_mode", &ConsumerConfiguration::getRegexSubscriptionMode, + return_value_policy::reference) .def("crypto_key_reader", &ConsumerConfiguration::setCryptoKeyReader, return_value_policy::reference) .def("replicate_subscription_state_enabled", &ConsumerConfiguration::setReplicateSubscriptionStateEnabled) @@ -328,9 +351,9 @@ void export_config(py::module_& m) { .def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy) .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy) .def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction, - return_value_policy::copy) + return_value_policy::copy) .def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction, - return_value_policy::reference); + return_value_policy::reference); class_>(m, "ReaderConfiguration") .def(init<>()) @@ -348,9 +371,9 @@ void export_config(py::module_& m) { .def("read_compacted", &ReaderConfiguration::setReadCompacted) .def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference) .def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive) - .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference) - .def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction, - return_value_policy::copy) + .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, + return_value_policy::reference) + .def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction, return_value_policy::copy) .def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction, - return_value_policy::reference); + return_value_policy::reference); } diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index b7f38edd..be817c5d 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -45,6 +45,9 @@ ConsumerBatchReceivePolicy, ProducerAccessMode, ConsumerDeadLetterPolicy, + ServiceInfoProvider, + ServiceInfo, + AutoClusterFailover, ) from pulsar.schema import JsonSchema, Record, Integer @@ -95,6 +98,30 @@ class PulsarTest(TestCase): serviceUrlTls = "pulsar+ssl://localhost:6651" + class StaticServiceInfoProvider(ServiceInfoProvider): + + def __init__(self, initial_service_info): + self.initial = initial_service_info + self.callback = None + self.closed = False + + def initial_service_info(self): + return self.initial + + def initialize(self, on_service_info_update): + self.callback = on_service_info_update + + def close(self): + self.closed = True + + class InvalidServiceInfoProvider(ServiceInfoProvider): + + def initial_service_info(self): + return "invalid" + + def initialize(self, on_service_info_update): + pass + def test_producer_config(self): conf = ProducerConfiguration() conf.send_timeout_millis(12) @@ -934,6 +961,53 @@ def test_client_argument_errors(self): self._check_value_error(lambda: Client(self.serviceUrl, tls_trust_certs_file_path=5)) self._check_value_error(lambda: Client(self.serviceUrl, tls_allow_insecure_connection="test")) + def test_service_info_argument_errors(self): + self._check_value_error(lambda: ServiceInfo(None)) + self._check_value_error(lambda: ServiceInfo(self.serviceUrl, authentication="test")) + self._check_value_error(lambda: ServiceInfo(self.serviceUrl, tls_trust_certs_file_path=5)) + + def test_auto_cluster_failover_argument_errors(self): + primary = ServiceInfo(self.serviceUrl) + secondary = [ServiceInfo("pulsar://192.0.2.1:6650")] + + self._check_value_error(lambda: AutoClusterFailover("test", secondary)) + self._check_value_error(lambda: AutoClusterFailover(primary, "test")) + self._check_value_error(lambda: AutoClusterFailover(primary, [])) + self._check_value_error(lambda: AutoClusterFailover(primary, ["test"])) + self._check_value_error(lambda: AutoClusterFailover(primary, secondary, check_interval_ms=0)) + self._check_value_error(lambda: AutoClusterFailover(primary, secondary, failover_threshold=0)) + self._check_value_error(lambda: AutoClusterFailover(primary, secondary, switch_back_threshold=0)) + self._check_value_error(lambda: Client(AutoClusterFailover(primary, secondary), + authentication=AuthenticationToken("token"))) + self._check_value_error(lambda: Client(AutoClusterFailover(primary, secondary), + tls_trust_certs_file_path=CERTS_DIR + "cacert.pem")) + self._check_value_error(lambda: Client(self.StaticServiceInfoProvider(primary), + authentication=AuthenticationToken("token"))) + self._check_value_error(lambda: Client(self.StaticServiceInfoProvider(primary), + tls_trust_certs_file_path=CERTS_DIR + "cacert.pem")) + self._check_value_error(lambda: Client(self.InvalidServiceInfoProvider())) + + def test_auto_cluster_failover_client(self): + primary = ServiceInfo(self.serviceUrl) + secondary = [ServiceInfo("pulsar://192.0.2.1:6650")] + client = Client(AutoClusterFailover(primary, secondary, check_interval_ms=100)) + self.assertEqual(client.get_service_info().service_url, self.serviceUrl) + client.close() + + def test_service_info_provider_client(self): + primary = ServiceInfo(self.serviceUrl) + secondary = ServiceInfo("pulsar://192.0.2.1:6650") + provider = self.StaticServiceInfoProvider(primary) + + client = Client(provider) + self.assertEqual(client.get_service_info().service_url, self.serviceUrl) + + provider.callback(secondary) + self.assertEqual(client.get_service_info().service_url, secondary.service_url) + + client.close() + self.assertTrue(provider.closed) + def test_producer_argument_errors(self): client = Client(self.serviceUrl) From 3f4161f7862c82a87435a318a223f65f848f7d9d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 10 Apr 2026 15:43:08 +0800 Subject: [PATCH 2/2] add integration test --- tests/auto_cluster_failover_test.py | 260 ++++++++++++++++++++++++++++ tests/run-unit-tests.sh | 3 + 2 files changed, 263 insertions(+) create mode 100644 tests/auto_cluster_failover_test.py diff --git a/tests/auto_cluster_failover_test.py b/tests/auto_cluster_failover_test.py new file mode 100644 index 00000000..168ae0d0 --- /dev/null +++ b/tests/auto_cluster_failover_test.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import shutil +import time +from unittest import SkipTest, TestCase, main +from urllib.error import URLError +from urllib.request import Request, urlopen + +import pulsar +from pulsar import AutoClusterFailover, Client, MessageId, ServiceInfo + +try: + from testcontainers.core.container import DockerContainer +except ImportError: + DockerContainer = None + + +class AutoClusterFailoverDockerTest(TestCase): + + PRIMARY_URL = "pulsar://localhost:16650" + PRIMARY_ADMIN_URL = "http://localhost:18080" + SECONDARY_URL = "pulsar://localhost:26650" + SECONDARY_ADMIN_URL = "http://localhost:28080" + RECEIVE_TIMEOUT_MS = 10000 + FAILOVER_WAIT_SECONDS = 30 + + @classmethod + def setUpClass(cls): + if shutil.which("docker") is None: + raise SkipTest("docker is required for auto_cluster_failover_test") + if DockerContainer is None: + raise SkipTest("testcontainers is required for auto_cluster_failover_test") + + try: + cls.primary_container = cls._create_container( + service_port=16650, + admin_port=18080, + cluster_name="standalone-0", + ) + cls.secondary_container = cls._create_container( + service_port=26650, + admin_port=28080, + cluster_name="standalone-1", + ) + cls.primary_container.start() + cls.secondary_container.start() + cls._wait_for_http(cls.PRIMARY_ADMIN_URL + "/metrics") + cls._wait_for_http(cls.SECONDARY_ADMIN_URL + "/metrics") + cls._configure_cluster( + cls.PRIMARY_ADMIN_URL, + cls.PRIMARY_URL, + "standalone-0", + ) + cls._configure_cluster( + cls.SECONDARY_ADMIN_URL, + cls.SECONDARY_URL, + "standalone-1", + ) + except Exception: + cls._print_container_logs() + raise + + @classmethod + def tearDownClass(cls): + for container in ( + getattr(cls, "primary_container", None), + getattr(cls, "secondary_container", None), + ): + if container is not None: + container.stop() + + @classmethod + def _create_container(cls, service_port, admin_port, cluster_name): + return ( + DockerContainer("apachepulsar/pulsar:latest") + .with_env("clusterName", cluster_name) + .with_env("advertisedAddress", "localhost") + .with_env("advertisedListeners", f"external:pulsar://localhost:{service_port}") + .with_env("PULSAR_MEM", "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m") + .with_bind_ports(6650, service_port) + .with_bind_ports(8080, admin_port) + .with_command( + 'bash -c "bin/apply-config-from-env.py conf/standalone.conf && ' + 'exec bin/pulsar standalone -nss -nfw"' + ) + ) + + @classmethod + def _print_container_logs(cls): + for container in ( + getattr(cls, "primary_container", None), + getattr(cls, "secondary_container", None), + ): + if container is None: + continue + wrapped = container.get_wrapped_container() + try: + print(wrapped.logs().decode("utf-8", errors="replace")) + except Exception: + pass + + @classmethod + def _wait_for_http(cls, url, timeout_seconds=180): + deadline = time.time() + timeout_seconds + last_error = None + while time.time() < deadline: + try: + with urlopen(url, timeout=5) as response: + if response.status == 200: + return + except (URLError, OSError) as e: + last_error = e + time.sleep(1) + raise AssertionError(f"Timed out waiting for {url}: {last_error}") + + @staticmethod + def _http_put(url, data): + request = Request(url, data.encode("utf-8")) + request.add_header("Content-Type", "application/json") + request.get_method = lambda: "PUT" + try: + with urlopen(request, timeout=10): + return + except URLError as e: + if "409" in str(e): + return + raise + + @classmethod + def _configure_cluster(cls, admin_url, service_url, cluster_name): + cls._http_put( + f"{admin_url}/admin/v2/clusters/{cluster_name}", + """ + { + "serviceUrl": "%s/", + "brokerServiceUrl": "%s/" + } + """ % (admin_url, service_url), + ) + cls._http_put( + f"{admin_url}/admin/v2/tenants/public", + """ + { + "adminRoles": ["anonymous"], + "allowedClusters": ["%s"] + } + """ % cluster_name, + ) + cls._http_put( + f"{admin_url}/admin/v2/namespaces/public/default", + """ + { + "replication_clusters": ["%s"] + } + """ % cluster_name, + ) + + @staticmethod + def _wait_until(predicate, timeout_seconds, description): + deadline = time.time() + timeout_seconds + while time.time() < deadline: + if predicate(): + return + time.sleep(0.2) + raise AssertionError(f"Timed out waiting for {description}") + + @staticmethod + def _ensure_topic_exists(service_url, topic): + client = Client(service_url) + producer = client.create_producer(topic) + producer.close() + client.close() + + def test_producer_failover_between_two_standalones(self): + topic = "test-auto-cluster-failover-%d" % int(time.time() * 1000) + message_before_failover = b"before-failover" + message_after_failover = b"after-failover" + + self._ensure_topic_exists(self.PRIMARY_URL, topic) + self._ensure_topic_exists(self.SECONDARY_URL, topic) + + primary_client = Client(self.PRIMARY_URL) + primary_reader = primary_client.create_reader(topic, MessageId.earliest) + + secondary_client = Client(self.SECONDARY_URL) + secondary_reader = secondary_client.create_reader(topic, MessageId.earliest) + + failover_client = Client( + AutoClusterFailover( + ServiceInfo(self.PRIMARY_URL), + [ServiceInfo(self.SECONDARY_URL)], + check_interval_ms=200, + failover_threshold=1, + switch_back_threshold=1, + ), + operation_timeout_seconds=10, + ) + producer = failover_client.create_producer( + topic, + send_timeout_millis=3000, + batching_enabled=False, + ) + + self.assertEqual(failover_client.get_service_info().service_url, self.PRIMARY_URL) + + producer.send(message_before_failover) + self.assertEqual(primary_reader.read_next(self.RECEIVE_TIMEOUT_MS).data(), message_before_failover) + + primary_reader.close() + primary_client.close() + + self.primary_container.get_wrapped_container().kill(signal="SIGTERM") + + self._wait_until( + lambda: failover_client.get_service_info().service_url == self.SECONDARY_URL, + self.FAILOVER_WAIT_SECONDS, + "client service info to switch to the secondary broker", + ) + + last_error = None + deadline = time.time() + self.FAILOVER_WAIT_SECONDS + while time.time() < deadline: + try: + producer.send(message_after_failover) + break + except pulsar.PulsarException as e: + last_error = e + time.sleep(0.5) + else: + raise AssertionError(f"Producer did not recover after failover: {last_error}") + + self.assertEqual(secondary_reader.read_next(self.RECEIVE_TIMEOUT_MS).data(), message_after_failover) + self.assertEqual(failover_client.get_service_info().service_url, self.SECONDARY_URL) + + producer.close() + failover_client.close() + secondary_reader.close() + secondary_client.close() + + +if __name__ == "__main__": + main() diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh index 8d7600d3..587d6c61 100755 --- a/tests/run-unit-tests.sh +++ b/tests/run-unit-tests.sh @@ -23,9 +23,12 @@ set -e -x ROOT_DIR=$(git rev-parse --show-toplevel) cd $ROOT_DIR/tests +python3 -m pip install testcontainers + python3 custom_logger_test.py python3 debug_logger_test.py python3 interrupted_test.py +python3 auto_cluster_failover_test.py python3 pulsar_test.py python3 schema_test.py python3 table_view_test.py