Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 200 additions & 4 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,159 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str
_check_type(str, method, 'method')
self.auth = _pulsar.AuthenticationBasic.create(username, password, method)


class ServiceInfoProvider:
"""
Base class for Python-defined service discovery and failover providers.

Subclasses must return the initial :class:`ServiceInfo` and may keep the
provided update callback to push later service changes into the client.
"""

def initial_service_info(self) -> "ServiceInfo":
raise NotImplementedError

def initialize(self, on_service_info_update: Callable[["ServiceInfo"], None]) -> None:
raise NotImplementedError

def close(self) -> None:
"""
Stop background work and release resources.

This is invoked when the underlying C++ client destroys the provider,
typically during :meth:`Client.close`.
"""
return None


class ServiceInfo:
"""
Connection information for one Pulsar cluster endpoint.

This is primarily used with :class:`AutoClusterFailover`.
"""

def __init__(self,
service_url: str,
authentication: Optional[Authentication] = None,
tls_trust_certs_file_path: Optional[str] = None):
"""
Create a service info entry.

Parameters
----------
service_url: str
The Pulsar service URL for this cluster.
authentication: Authentication, optional
Authentication to use when connecting to this cluster.
tls_trust_certs_file_path: str, optional
Trust store path for TLS connections to this cluster.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')

self._authentication = authentication
self._service_info = _pulsar.ServiceInfo(
service_url,
authentication.auth if authentication else None,
tls_trust_certs_file_path,
)

@property
def service_url(self) -> str:
return self._service_info.service_url

@property
def use_tls(self) -> bool:
return self._service_info.use_tls

@property
def tls_trust_certs_file_path(self) -> Optional[str]:
return self._service_info.tls_trust_certs_file_path

def __repr__(self) -> str:
return (
"ServiceInfo("
f"service_url={self.service_url!r}, "
f"use_tls={self.use_tls!r}, "
f"tls_trust_certs_file_path={self.tls_trust_certs_file_path!r})"
)

@classmethod
def wrap(cls, service_info: _pulsar.ServiceInfo):
self = cls.__new__(cls)
self._authentication = None
self._service_info = service_info
return self


class AutoClusterFailover:
"""
Cluster-level automatic failover configuration for :class:`Client`.
"""

def __init__(self,
primary: ServiceInfo,
secondary: List[ServiceInfo],
check_interval_ms: int = 5000,
failover_threshold: int = 1,
switch_back_threshold: int = 1):
"""
Create an automatic failover configuration.

Parameters
----------
primary: ServiceInfo
The preferred cluster to use.
secondary: list[ServiceInfo]
Ordered fallback clusters to probe when the primary becomes unavailable.
check_interval_ms: int, default=5000
Probe interval in milliseconds.
failover_threshold: int, default=1
Number of consecutive probe failures required before failover.
switch_back_threshold: int, default=1
Number of consecutive successful primary probes required before switching back.
"""
_check_type(ServiceInfo, primary, 'primary')
_check_type(list, secondary, 'secondary')
_check_type(int, check_interval_ms, 'check_interval_ms')
_check_type(int, failover_threshold, 'failover_threshold')
_check_type(int, switch_back_threshold, 'switch_back_threshold')

if not secondary:
raise ValueError("Argument secondary is expected to contain at least one ServiceInfo")

for index, service_info in enumerate(secondary):
if not isinstance(service_info, ServiceInfo):
raise ValueError(
"Argument secondary[%d] is expected to be of type 'ServiceInfo' and not '%s'"
% (index, type(service_info).__name__)
)

if check_interval_ms <= 0:
raise ValueError("Argument check_interval_ms is expected to be greater than 0")
if failover_threshold <= 0:
raise ValueError("Argument failover_threshold is expected to be greater than 0")
if switch_back_threshold <= 0:
raise ValueError("Argument switch_back_threshold is expected to be greater than 0")

self.primary = primary
self.secondary = list(secondary)
self.check_interval_ms = check_interval_ms
self.failover_threshold = failover_threshold
self.switch_back_threshold = switch_back_threshold

def __repr__(self) -> str:
return (
"AutoClusterFailover("
f"primary={self.primary!r}, "
f"secondary={self.secondary!r}, "
f"check_interval_ms={self.check_interval_ms!r}, "
f"failover_threshold={self.failover_threshold!r}, "
f"switch_back_threshold={self.switch_back_threshold!r})"
)

class ConsumerDeadLetterPolicy:
"""
Configuration for the "dead letter queue" feature in consumer.
Expand Down Expand Up @@ -681,8 +834,9 @@ def __init__(self, service_url,
Parameters
----------

service_url: str
The Pulsar service url eg: pulsar://my-broker.com:6650/
service_url: str or AutoClusterFailover or ServiceInfoProvider
The Pulsar service URL, for example ``pulsar://my-broker.com:6650/``, or an
:class:`AutoClusterFailover` or :class:`ServiceInfoProvider` configuration.
authentication: Authentication, optional
Set the authentication provider to be used with the broker. Supported methods:

Expand Down Expand Up @@ -743,7 +897,26 @@ def __init__(self, service_url,
tls_certificate_file_path: str, optional
The path to the TLS certificate file.
"""
_check_type(str, service_url, 'service_url')
if not isinstance(service_url, (str, AutoClusterFailover, ServiceInfoProvider)):
raise ValueError(
"Argument service_url is expected to be of type 'str', 'AutoClusterFailover' or "
"'ServiceInfoProvider'"
)

if isinstance(service_url, (AutoClusterFailover, ServiceInfoProvider)) and authentication is not None:
raise ValueError(
"Argument authentication is not supported when service_url is an AutoClusterFailover or "
"ServiceInfoProvider; set authentication on each ServiceInfo instead"
)

if isinstance(service_url, (AutoClusterFailover, ServiceInfoProvider)) and \
tls_trust_certs_file_path is not None:
raise ValueError(
"Argument tls_trust_certs_file_path is not supported when service_url is an "
"AutoClusterFailover or ServiceInfoProvider; set tls_trust_certs_file_path on each "
"ServiceInfo instead"
)

_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
Expand Down Expand Up @@ -792,7 +965,24 @@ def __init__(self, service_url,
conf.tls_private_key_file_path(tls_private_key_file_path)
if tls_certificate_file_path is not None:
conf.tls_certificate_file_path(tls_certificate_file_path)
self._client = _pulsar.Client(service_url, conf)
if isinstance(service_url, AutoClusterFailover):
self._client = _pulsar.Client.create_auto_cluster_failover(
service_url.primary._service_info,
[service_info._service_info for service_info in service_url.secondary],
service_url.check_interval_ms,
service_url.failover_threshold,
service_url.switch_back_threshold,
conf,
)
elif isinstance(service_url, ServiceInfoProvider):
try:
self._client = _pulsar.Client.create_service_info_provider(service_url, conf)
except RuntimeError as e:
if str(e) == "Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance":
raise ValueError(str(e))
raise
else:
self._client = _pulsar.Client(service_url, conf)
self._consumers = []

@staticmethod
Expand Down Expand Up @@ -1417,6 +1607,12 @@ def get_topic_partitions(self, topic):
_check_type(str, topic, 'topic')
return self._client.get_topic_partitions(topic)

def get_service_info(self) -> ServiceInfo:
"""
Get the current service info used by this client.
"""
return ServiceInfo.wrap(self._client.get_service_info())

def shutdown(self):
"""
Perform immediate shutdown of Pulsar client.
Expand Down
98 changes: 91 additions & 7 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,70 @@
*/
#include "utils.h"

#include <pulsar/AutoClusterFailover.h>
#include <pulsar/ServiceInfoProvider.h>
#include <chrono>
#include <memory>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

namespace py = pybind11;

static ServiceInfo unwrapPythonServiceInfo(const py::handle& object) {
auto serviceInfoObject = py::reinterpret_borrow<py::object>(object);

try {
return serviceInfoObject.cast<ServiceInfo>();
} catch (const py::cast_error&) {
}

if (py::hasattr(serviceInfoObject, "_service_info")) {
try {
return serviceInfoObject.attr("_service_info").cast<ServiceInfo>();
} catch (const py::cast_error&) {
}
}

throw py::value_error("Expected a pulsar.ServiceInfo or _pulsar.ServiceInfo instance");
}

class PythonServiceInfoProvider : public ServiceInfoProvider {
public:
explicit PythonServiceInfoProvider(py::object provider) : provider_(std::move(provider)) {}

~PythonServiceInfoProvider() override {
if (!Py_IsInitialized()) {
return;
}

py::gil_scoped_acquire acquire;
try {
if (py::hasattr(provider_, "close")) {
provider_.attr("close")();
}
} catch (const py::error_already_set&) {
PyErr_Print();
}
}

ServiceInfo initialServiceInfo() override {
py::gil_scoped_acquire acquire;
return unwrapPythonServiceInfo(provider_.attr("initial_service_info")());
}

void initialize(std::function<void(ServiceInfo)> onServiceInfoUpdate) override {
py::gil_scoped_acquire acquire;
provider_.attr("initialize")(py::cpp_function(
[onServiceInfoUpdate = std::move(onServiceInfoUpdate)](py::object serviceInfo) mutable {
onServiceInfoUpdate(unwrapPythonServiceInfo(serviceInfo));
}));
}

private:
py::object provider_;
};

Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
return waitForAsyncValue<Producer>(
[&](CreateProducerCallback callback) { client.createProducerAsync(topic, conf, callback); });
Expand Down Expand Up @@ -65,7 +123,8 @@ std::vector<std::string> Client_getTopicPartitions(Client& client, const std::st
[&](GetPartitionsCallback callback) { client.getPartitionsForTopicAsync(topic, callback); });
}

void Client_getTopicPartitionsAsync(Client &client, const std::string& topic, GetPartitionsCallback callback) {
void Client_getTopicPartitionsAsync(Client& client, const std::string& topic,
GetPartitionsCallback callback) {
py::gil_scoped_release release;
client.getPartitionsForTopicAsync(topic, callback);
}
Expand All @@ -76,6 +135,25 @@ SchemaInfo Client_getSchemaInfo(Client& client, const std::string& topic, int64_
});
}

std::shared_ptr<Client> Client_createAutoClusterFailover(ServiceInfo primary,
std::vector<ServiceInfo> secondary,
int64_t checkIntervalMs, uint32_t failoverThreshold,
uint32_t switchBackThreshold,
const ClientConfiguration& conf) {
AutoClusterFailover::Config autoClusterFailoverConfig(std::move(primary), std::move(secondary));
autoClusterFailoverConfig.checkInterval = std::chrono::milliseconds(checkIntervalMs);
autoClusterFailoverConfig.failoverThreshold = failoverThreshold;
autoClusterFailoverConfig.switchBackThreshold = switchBackThreshold;
return std::make_shared<Client>(
Client::create(std::make_unique<AutoClusterFailover>(std::move(autoClusterFailoverConfig)), conf));
}

std::shared_ptr<Client> Client_createServiceInfoProvider(py::object provider,
const ClientConfiguration& conf) {
return std::make_shared<Client>(
Client::create(std::make_unique<PythonServiceInfoProvider>(std::move(provider)), conf));
}

void Client_close(Client& client) {
waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); });
}
Expand Down Expand Up @@ -108,19 +186,25 @@ void Client_subscribeAsync_pattern(Client& client, const std::string& topic_patt
void export_client(py::module_& m) {
py::class_<Client, std::shared_ptr<Client>>(m, "Client")
.def(py::init<const std::string&, const ClientConfiguration&>())
.def_static("create_auto_cluster_failover", &Client_createAutoClusterFailover, py::arg("primary"),
py::arg("secondary"), py::arg("check_interval_ms"), py::arg("failover_threshold"),
py::arg("switch_back_threshold"), py::arg("client_configuration"))
.def_static("create_service_info_provider", &Client_createServiceInfoProvider, py::arg("provider"),
py::arg("client_configuration"))
.def("create_producer", &Client_createProducer)
.def("create_producer_async", &Client_createProducerAsync)
.def("subscribe", &Client_subscribe)
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("create_reader", &Client_createReader)
.def("create_table_view", [](Client& client, const std::string& topic,
const TableViewConfiguration& config) {
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
client.createTableViewAsync(topic, config, callback);
});
})
.def("create_table_view",
[](Client& client, const std::string& topic, const TableViewConfiguration& config) {
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
client.createTableViewAsync(topic, config, callback);
});
})
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_service_info", &Client::getServiceInfo)
.def("get_schema_info", &Client_getSchemaInfo)
.def("close", &Client_close)
.def("close_async", &Client_closeAsync)
Expand Down
Loading
Loading