From cf1e66049caef5a65be1a5e2a21f6ebad065c651 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 2 Jun 2026 11:41:56 +0300 Subject: [PATCH 01/12] AtomicLong for asyncio --- hazelcast/asyncio/__init__.py | 4 + hazelcast/internal/asyncio_client.py | 7 + .../internal/asyncio_proxy/atomic_long.py | 255 ++++++++++++++++++ hazelcast/internal/asyncio_proxy/cp.py | 34 +++ .../internal/asyncio_proxy/cp_manager.py | 79 ++++++ tests/integration/asyncio/base.py | 36 +++ .../asyncio/proxy/atomic_long_test.py | 148 ++++++++++ 7 files changed, 563 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/atomic_long.py create mode 100644 hazelcast/internal/asyncio_proxy/cp.py create mode 100644 hazelcast/internal/asyncio_proxy/cp_manager.py create mode 100644 tests/integration/asyncio/proxy/atomic_long_test.py diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index e06313e9c2..97f42b627f 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -4,6 +4,8 @@ del warnings __all__ = [ + "AtomicLong", + "CPSubsystem", "EntryEventCallable", "Executor", "HazelcastClient", @@ -32,3 +34,5 @@ from hazelcast.internal.asyncio_proxy.set import Set from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener +from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem +from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 5d7bd327f7..bf1adaf417 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -11,6 +11,7 @@ from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation +from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService @@ -203,6 +204,7 @@ def __init__(self, config: Config | None = None, **kwargs): self._compact_schema_service, ) self._proxy_manager = ProxyManager(self._context) + self._cp_subsystem = CPSubsystem(self._context) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics( self, @@ -519,6 +521,11 @@ def cluster_service(self) -> ClusterService: """ return self._cluster_service + @property + def cp_subsystem(self) -> CPSubsystem: + """CP Subsystem offers set of in-memory linearizable data structures.""" + return self._cp_subsystem + def _create_address_provider(self): config = self._config cluster_members = config.cluster_members diff --git a/hazelcast/internal/asyncio_proxy/atomic_long.py b/hazelcast/internal/asyncio_proxy/atomic_long.py new file mode 100644 index 0000000000..664b097ef7 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/atomic_long.py @@ -0,0 +1,255 @@ +import typing + +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import ( + atomic_long_add_and_get_codec, + atomic_long_compare_and_set_codec, + atomic_long_get_codec, + atomic_long_get_and_add_codec, + atomic_long_get_and_set_codec, + atomic_long_alter_codec, + atomic_long_apply_codec, +) +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_is_int, check_not_none + + +class AtomicLong(BaseCPProxy): + """AtomicLong is a redundant and highly available distributed counter + for 64-bit integers (``long`` type in Java). + + It works on top of the Raft consensus algorithm. It offers linearizability + during crash failures and network partitions. It is CP with respect to + the CAP principle. If a network partition occurs, it remains available + on at most one side of the partition. + + AtomicLong implementation does not offer exactly-once / effectively-once + execution semantics. It goes with at-least-once execution semantics + by default and can cause an API call to be committed multiple times + in case of CP member failures. It can be tuned to offer at-most-once + execution semantics. Please see `fail-on-indeterminate-operation-state` + server-side setting. + """ + + async def add_and_get(self, delta: int) -> int: + """Atomically adds the given value to the current value. + + Args: + delta: The value to add to the current value. + + Returns: + The updated value, the given value added to the current value. + """ + check_is_int(delta) + codec = atomic_long_add_and_get_codec + request = codec.encode_request(self._group_id, self._object_name, delta) + return await self._ainvoke(request, codec.decode_response) + + async def compare_and_set(self, expect: int, update: int) -> bool: + """Atomically sets the value to the given updated value + only if the current value equals the expected value. + + Args: + expect: The expected value. + update: The new value. + + Returns: + ``True`` if successful; or ``False`` if the actual value was not + equal to the expected value. + """ + check_is_int(expect) + check_is_int(update) + codec = atomic_long_compare_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, expect, update) + return await self._ainvoke(request, codec.decode_response) + + async def decrement_and_get(self) -> int: + """Atomically decrements the current value by one. + + Returns: + The updated value, the current value decremented by one. + """ + return await self.add_and_get(-1) + + async def get_and_decrement(self) -> int: + """Atomically decrements the current value by one. + + Returns: + The old value. + """ + return await self.get_and_add(-1) + + async def get(self) -> int: + """Gets the current value. + + Returns: + The current value. + """ + codec = atomic_long_get_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_add(self, delta: int) -> int: + """Atomically adds the given value to the current value. + + Args: + delta: The value to add to the current value. + + Returns: + The old value before the add. + """ + check_is_int(delta) + codec = atomic_long_get_and_add_codec + request = codec.encode_request(self._group_id, self._object_name, delta) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_set(self, new_value: int) -> int: + """Atomically sets the given value and returns the old value. + + Args: + new_value: The new value. + + Returns: + The old value. + """ + check_is_int(new_value) + codec = atomic_long_get_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value) + return await self._ainvoke(request, codec.decode_response) + + async def increment_and_get(self) -> int: + """Atomically increments the current value by one. + + Returns: + The updated value, the current value incremented by one. + """ + return await self.add_and_get(1) + + async def get_and_increment(self) -> int: + """Atomically increments the current value by one. + + Returns: + The old value. + """ + return await self.get_and_add(1) + + async def set(self, new_value: int) -> None: + """Atomically sets the given value. + + Args: + new_value: The new value + """ + check_is_int(new_value) + codec = atomic_long_get_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value) + return await self._ainvoke(request) + + async def alter(self, function: typing.Any) -> None: + """Alters the currently stored value by applying a function on it. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter, function) + + codec = atomic_long_alter_codec + # 1 means return the new value. + # There is no way to tell server to return nothing as of now (30.09.2020) + # The new value is `long` (comes with the initial frame) and we + # don't try to decode it. So, this shouldn't cause any problems. + request = codec.encode_request(self._group_id, self._object_name, function_data, 1) + return await self._ainvoke(request) + + async def alter_and_get(self, function: typing.Any) -> int: + """Alters the currently stored value by applying a function on it and + gets the result. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + + Returns: + The new value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter_and_get, function) + + codec = atomic_long_alter_codec + # 1 means return the new value. + request = codec.encode_request(self._group_id, self._object_name, function_data, 1) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_alter(self, function: typing.Any) -> int: + """Alters the currently stored value by applying a function on it and + gets the old value. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + + Returns: + The old value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_alter, function) + + codec = atomic_long_alter_codec + # 0 means return the old value. + request = codec.encode_request(self._group_id, self._object_name, function_data, 0) + return await self._ainvoke(request, codec.decode_response) + + async def apply(self, function: typing.Any) -> typing.Any: + """Applies a function on the value, the actual stored value will not + change. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function applied to the currently stored value. + + Returns: + The result of the function application. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return self._send_schema_and_retry(e, self.apply, function) + + codec = atomic_long_apply_codec + request = codec.encode_request(self._group_id, self._object_name, function_data) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) diff --git a/hazelcast/internal/asyncio_proxy/cp.py b/hazelcast/internal/asyncio_proxy/cp.py new file mode 100644 index 0000000000..333500a641 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/cp.py @@ -0,0 +1,34 @@ +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.protocol.codec import cp_group_destroy_cp_object_codec + + +def _no_op_response_handler(_): + return None + + +class BaseCPProxy: + def __init__(self, context, group_id, service_name, proxy_name, object_name): + self._group_id = group_id + self._service_name = service_name + self._proxy_name = proxy_name + self._object_name = object_name + self._invocation_service = context.invocation_service + serialization_service = context.serialization_service + self._to_data = serialization_service.to_data + self._to_object = serialization_service.to_object + self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry + + async def destroy(self) -> None: + """Destroys this proxy.""" + codec = cp_group_destroy_cp_object_codec + request = codec.encode_request(self._group_id, self._service_name, self._object_name) + return await self._ainvoke(request) + + def _invoke(self, request, response_handler=_no_op_response_handler): + invocation = Invocation(request, response_handler=response_handler) + self._invocation_service.invoke(invocation) + return invocation.future + + async def _ainvoke(self, request, response_handler=_no_op_response_handler): + fut = self._invoke(request, response_handler) + return await fut diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py new file mode 100644 index 0000000000..dea872be06 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -0,0 +1,79 @@ +from hazelcast.cp import ( + _without_default_group_name, + _get_object_name_for_proxy, + ATOMIC_LONG_SERVICE, +) +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.protocol.codec import cp_group_create_cp_group_codec + + +class CPSubsystem: + """CP Subsystem is a component of Hazelcast that builds a strongly + consistent layer for a set of distributed data structures. + + Its APIs can be used for implementing distributed coordination use cases, + such as leader election, distributed locking, synchronization, and metadata + management. + + Its data structures are CP with respect to the CAP principle, i.e., they + always maintain linearizability and prefer consistency to availability + during network partitions. Besides network partitions, CP Subsystem + withstands server and client failures. + + Data structures in CP Subsystem run in CP groups. Each CP group elects + its own Raft leader and runs the Raft consensus algorithm independently. + + The CP data structures differ from the other Hazelcast data structures + in two aspects. First, an internal commit is performed on the METADATA CP + group every time you fetch a proxy from this interface. Hence, callers + should cache returned proxy objects. Second, if you call ``destroy()`` + on a CP data structure proxy, that data structure is terminated on the + underlying CP group and cannot be reinitialized until the CP group is + force-destroyed. For this reason, please make sure that you are completely + done with a CP data structure before destroying its proxy. + """ + + def __init__(self, context): + self._proxy_manager = CPProxyManager(context) + + async def get_atomic_long(self, name: str) -> AtomicLong: + """Returns the distributed AtomicLong instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + AtomicLong instance will be created on the default CP group. + If a group name is given, like ``.get_atomic_long("myLong@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name: Name of the AtomicLong. + + Returns: + The AtomicLong proxy for the given name. + """ + return await self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name) + + +class CPProxyManager: + def __init__(self, context): + self._context = context + + async def get_or_create(self, service_name, proxy_name): + proxy_name = _without_default_group_name(proxy_name) + object_name = _get_object_name_for_proxy(proxy_name) + + group_id = await self._get_group_id(proxy_name) + if service_name == ATOMIC_LONG_SERVICE: + return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) + + raise ValueError("Unknown service name: %s" % service_name) + + async def _get_group_id(self, proxy_name): + codec = cp_group_create_cp_group_codec + request = codec.encode_request(proxy_name) + invocation = Invocation(request, response_handler=codec.decode_response) + invocation_service = self._context.invocation_service + return await invocation_service.ainvoke(invocation) diff --git a/tests/integration/asyncio/base.py b/tests/integration/asyncio/base.py index 55c92e1b50..8fbb2a87d3 100644 --- a/tests/integration/asyncio/base.py +++ b/tests/integration/asyncio/base.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import unittest from typing import Awaitable @@ -121,3 +122,38 @@ async def asyncSetUp(self): async def asyncTearDown(self): await self.client.shutdown() + + +class CPTestCase(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + + rc = None + cluster = None + client = None + + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, cls.configure_cluster()) + cls.cluster.start_member() + cls.cluster.start_member() + cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open( + os.path.join(dir_path, "../backward_compatible/proxy/cp/hazelcast_cpsubsystem.xml") + ) as f: + return f.read() + + async def asyncSetUp(self): + self.client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + + async def asyncTearDown(self): + await self.client.shutdown() diff --git a/tests/integration/asyncio/proxy/atomic_long_test.py b/tests/integration/asyncio/proxy/atomic_long_test.py new file mode 100644 index 0000000000..5acb0df721 --- /dev/null +++ b/tests/integration/asyncio/proxy/atomic_long_test.py @@ -0,0 +1,148 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError +from hazelcast.serialization.api import IdentifiedDataSerializable +from tests.integration.asyncio.base import CPTestCase +from tests.util import skip_if_server_version_older_than + + +class Multiplication(IdentifiedDataSerializable): + def __init__(self, multiplier): + self.multiplier = multiplier + + def write_data(self, object_data_output): + object_data_output.write_long(self.multiplier) + + def read_data(self, object_data_input): + pass + + def get_factory_id(self): + return 66 + + def get_class_id(self): + return 16 + + +@pytest.mark.enterprise +class AtomicLongTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.atomic_long = await self.client.cp_subsystem.get_atomic_long("long") + + async def asyncTearDown(self): + await self.atomic_long.set(0) + await super().asyncTearDown() + + async def test_atomic_long_in_another_group(self): + another_long = await self.client.cp_subsystem.get_atomic_long("long@mygroup") + self.assertEqual(1, await another_long.increment_and_get()) + # the following value has to be 0, + # as `along` belongs to the default CP group + self.assertEqual(0, await self.atomic_long.get()) + + async def test_use_after_destroy(self): + another_long = await self.client.cp_subsystem.get_atomic_long("another-long") + await another_long.destroy() + # the next destroy call should be ignored + await another_long.destroy() + + try: + await another_long.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + another_long2 = await self.client.cp_subsystem.get_atomic_long("another-long") + try: + await another_long2.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_initial_value(self): + self.assertEqual(0, await self.atomic_long.get()) + + async def test_add_and_get(self): + self.assertEqual(33, await self.atomic_long.add_and_get(33)) + self.assertEqual(33, await self.atomic_long.get()) + + async def test_compare_and_set_when_condition_is_met(self): + self.assertTrue(await self.atomic_long.compare_and_set(0, 23)) + self.assertEqual(23, await self.atomic_long.get()) + + async def test_compare_and_set_when_condition_is_not_met(self): + self.assertFalse(await self.atomic_long.compare_and_set(1, 23)) + self.assertEqual(0, await self.atomic_long.get()) + + async def test_decrement_and_get(self): + self.assertEqual(-1, await self.atomic_long.decrement_and_get()) + self.assertEqual(-2, await self.atomic_long.decrement_and_get()) + self.assertEqual(-2, await self.atomic_long.get()) + + async def test_get_and_decrement(self): + self.assertEqual(0, await self.atomic_long.get_and_decrement()) + self.assertEqual(-1, await self.atomic_long.get_and_decrement()) + self.assertEqual(-2, await self.atomic_long.get()) + + async def test_get(self): + self.assertEqual(0, await self.atomic_long.get()) + await self.atomic_long.set(11) + self.assertEqual(11, await self.atomic_long.get()) + long_max = 2**63 - 1 + await self.atomic_long.set(long_max) + self.assertEqual(long_max, await self.atomic_long.get()) + long_min = -(2**63) + await self.atomic_long.set(long_min) + self.assertEqual(long_min, await self.atomic_long.get()) + + async def test_get_and_add(self): + self.assertEqual(0, await self.atomic_long.get_and_add(-100)) + self.assertEqual(-100, await self.atomic_long.get()) + + async def test_get_and_set(self): + self.assertEqual(0, await self.atomic_long.get_and_set(123)) + self.assertEqual(123, await self.atomic_long.get()) + + async def test_increment_and_get(self): + self.assertEqual(1, await self.atomic_long.increment_and_get()) + self.assertEqual(2, await self.atomic_long.increment_and_get()) + self.assertEqual(2, await self.atomic_long.get()) + + async def test_get_and_increment(self): + self.assertEqual(0, await self.atomic_long.get_and_increment()) + self.assertEqual(1, await self.atomic_long.get_and_increment()) + self.assertEqual(2, await self.atomic_long.get()) + + async def test_set(self): + self.assertIsNone(await self.atomic_long.set(42)) + self.assertEqual(42, await self.atomic_long.get()) + + async def test_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(2) + self.assertIsNone(await self.atomic_long.alter(Multiplication(5))) + self.assertEqual(10, await self.atomic_long.get()) + + async def test_alter_and_get(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(-3) + self.assertEqual(-9, await self.atomic_long.alter_and_get(Multiplication(3))) + self.assertEqual(-9, await self.atomic_long.get()) + + async def test_get_and_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(123) + self.assertEqual(123, await self.atomic_long.get_and_alter(Multiplication(-1))) + self.assertEqual(-123, await self.atomic_long.get()) + + async def test_apply(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(42) + self.assertEqual(84, await self.atomic_long.apply(Multiplication(2))) + self.assertEqual(42, await self.atomic_long.get()) From c493901b6ccc12b48f50ce79c60aa2c7d29812e1 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 2 Jun 2026 13:39:36 +0300 Subject: [PATCH 02/12] AtomicReference for Asyncio --- .../asyncio_proxy/atomic_reference.py | 277 ++++++++++++++++++ .../internal/asyncio_proxy/cp_manager.py | 24 ++ .../asyncio/proxy/atomic_reference_test.py | 144 +++++++++ 3 files changed, 445 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/atomic_reference.py create mode 100644 tests/integration/asyncio/proxy/atomic_reference_test.py diff --git a/hazelcast/internal/asyncio_proxy/atomic_reference.py b/hazelcast/internal/asyncio_proxy/atomic_reference.py new file mode 100644 index 0000000000..0d12013db8 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/atomic_reference.py @@ -0,0 +1,277 @@ +import typing + +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import ( + atomic_ref_compare_and_set_codec, + atomic_ref_get_codec, + atomic_ref_set_codec, + atomic_ref_contains_codec, + atomic_ref_apply_codec, +) +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.types import ElementType +from hazelcast.util import check_not_none + + +class AtomicReference(BaseCPProxy, typing.Generic[ElementType]): + """A distributed, highly available object reference with atomic operations. + + AtomicReference offers linearizability during crash failures and network + partitions. It is CP with respect to the CAP principle. If a network + partition occurs, it remains available on at most one side of the + partition. + + The following are some considerations you need to know when you use + AtomicReference: + + - AtomicReference works based on the byte-content and not on the + object-reference. If you use the ``compare_and_set()`` method, do not + change the original value because its serialized content will then be + different. + - All methods returning an object return a private copy. You can modify the + private copy, but the rest of the world is shielded from your changes. If + you want these changes to be visible to the rest of the world, you need + to write the change back to the AtomicReference; but be careful about + introducing a data-race. + - The in-memory format of an AtomicReference is ``binary``. The receiving + side does not need to have the class definition available unless it needs + to be deserialized on the other side., e.g., because a method like + `alter()` is executed. This deserialization is done for every call that + needs to have the object instead of the binary content, so be careful + with expensive object graphs that need to be deserialized. + - If you have an object with many fields or an object graph, and you only + need to calculate some information or need a subset of fields, you can + use the `apply()` method. With the `apply()` method, the whole object + does not need to be sent over the line; only the information that is + relevant is sent. + + IAtomicReference does not offer exactly-once / effectively-once + execution semantics. It goes with at-least-once execution semantics + by default and can cause an API call to be committed multiple times + in case of CP member failures. It can be tuned to offer at-most-once + execution semantics. Please see `fail-on-indeterminate-operation-state` + server-side setting. + """ + + async def compare_and_set( + self, expect: typing.Optional[ElementType], update: typing.Optional[ElementType] + ) -> bool: + """Atomically sets the value to the given updated value + only if the current value is equal to the expected value. + + Args: + expect: The expected value. + update: The new value. + + Returns: + ``True`` if successful, or ``False`` if the actual value was not + equal to the expected value. + """ + try: + expected_data = self._to_data(expect) + new_data = self._to_data(update) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.compare_and_set, expect, update) + + codec = atomic_ref_compare_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, expected_data, new_data) + return await self._ainvoke(request, codec.decode_response) + + async def get(self) -> typing.Optional[ElementType]: + """Gets the current value. + + Returns: + The current value. + """ + codec = atomic_ref_get_codec + request = codec.encode_request(self._group_id, self._object_name) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def set(self, new_value: typing.Optional[ElementType]) -> None: + """Atomically sets the given value. + + Args: + new_value: The new value. + """ + try: + new_value_data = self._to_data(new_value) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.set, new_value) + + codec = atomic_ref_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value_data, False) + return await self._ainvoke(request) + + async def get_and_set( + self, new_value: typing.Optional[ElementType] + ) -> typing.Optional[ElementType]: + """Gets the old value and sets the new value. + + Args: + new_value: The new value. + + Returns: + The old value. + """ + try: + new_value_data = self._to_data(new_value) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_set, new_value) + + codec = atomic_ref_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value_data, True) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def is_none(self) -> bool: + """Checks if the stored reference is ``None``. + + Returns: + ``True`` if the stored reference is ``None``, ``False`` otherwise. + """ + return await self.contains(None) + + async def clear(self) -> None: + """Clears the current stored reference, so it becomes ``None``.""" + return await self.set(None) + + async def contains(self, value: typing.Optional[ElementType]) -> bool: + """Checks if the reference contains the value. + + Args: + value: The value to check (is allowed to be ``None``). + + Returns: + ``True`` if the value is found, ``False`` otherwise. + """ + try: + value_data = self._to_data(value) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.contains, value) + + codec = atomic_ref_contains_codec + request = codec.encode_request(self._group_id, self._object_name, value_data) + return await self._ainvoke(request, codec.decode_response) + + async def alter(self, function: typing.Any) -> None: + """Alters the currently stored reference by applying a function on it. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored reference. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter, function) + + codec = atomic_ref_apply_codec + # 0 means don't return the value + request = codec.encode_request(self._group_id, self._object_name, function_data, 0, True) + return await self._ainvoke(request) + + async def alter_and_get(self, function: typing.Any) -> typing.Optional[ElementType]: + """Alters the currently stored reference by applying a function on it + and gets the result. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored reference. + + Returns: + The new value, the result of the applied function. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter_and_get, function) + + codec = atomic_ref_apply_codec + # 2 means return the new value + request = codec.encode_request(self._group_id, self._object_name, function_data, 2, True) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def get_and_alter(self, function: typing.Any) -> typing.Optional[ElementType]: + """Alters the currently stored reference by applying a function on it + on and gets the old value. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored reference. + + Returns: + The old value, the value before the function is applied. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_alter, function) + + codec = atomic_ref_apply_codec + # 1 means return the old value + request = codec.encode_request(self._group_id, self._object_name, function_data, 1, True) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def apply(self, function: typing.Any) -> typing.Optional[ElementType]: + """Applies a function on the value, the actual stored value will not + change. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function applied on the currently stored reference. + + Returns: + The result of the function application. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.apply, function) + + codec = atomic_ref_apply_codec + # 2 means return the new value + request = codec.encode_request(self._group_id, self._object_name, function_data, 2, False) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index dea872be06..4141ec45c7 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -2,9 +2,11 @@ _without_default_group_name, _get_object_name_for_proxy, ATOMIC_LONG_SERVICE, + ATOMIC_REFERENCE_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.protocol.codec import cp_group_create_cp_group_codec @@ -56,6 +58,26 @@ async def get_atomic_long(self, name: str) -> AtomicLong: """ return await self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name) + async def get_atomic_reference(self, name: str) -> AtomicReference: + """Returns the distributed AtomicReference instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + AtomicLong instance will be created on the DEFAULT CP group. + If a group name is given, like + ``.get_atomic_reference("myRef@group1")``, the given group will be + initialized first, if not initialized already, and then the instance + will be created on this group. + + Args: + name: Name of the AtomicReference. + + Returns: + The AtomicReference proxy for the given name. + """ + return await self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name) + class CPProxyManager: def __init__(self, context): @@ -68,6 +90,8 @@ async def get_or_create(self, service_name, proxy_name): group_id = await self._get_group_id(proxy_name) if service_name == ATOMIC_LONG_SERVICE: return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == ATOMIC_REFERENCE_SERVICE: + return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) diff --git a/tests/integration/asyncio/proxy/atomic_reference_test.py b/tests/integration/asyncio/proxy/atomic_reference_test.py new file mode 100644 index 0000000000..404bab4980 --- /dev/null +++ b/tests/integration/asyncio/proxy/atomic_reference_test.py @@ -0,0 +1,144 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, ClassCastError +from tests.integration.asyncio.base import CPTestCase +from tests.integration.backward_compatible.proxy.cp.atomic_reference_test import AppendString +from tests.util import skip_if_server_version_older_than + + +@pytest.mark.enterprise +class AtomicReferenceTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.ref = await self.client.cp_subsystem.get_atomic_reference("ref") + + async def asyncTearDown(self): + await self.ref.clear() + await super().asyncTearDown() + + async def test_ref_in_another_group(self): + another_ref = await self.client.cp_subsystem.get_atomic_reference("ref@mygroup") + await another_ref.set("hey") + self.assertEqual("hey", await another_ref.get()) + # the following value has to be None, + # as `ref` belongs to the default CP group + self.assertIsNone(await self.ref.get()) + + async def test_use_after_destroy(self): + another_ref = await self.client.cp_subsystem.get_atomic_reference("another-ref") + await another_ref.destroy() + # the next destroy call should be ignored + await another_ref.destroy() + + try: + await another_ref.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + another_ref2 = await self.client.cp_subsystem.get_atomic_reference("another-ref") + + try: + await another_ref2.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_initial_value(self): + self.assertIsNone(await self.ref.get()) + + async def test_compare_and_set_when_condition_is_met(self): + self.assertTrue(await self.ref.compare_and_set(None, 42)) + self.assertEqual(42, await self.ref.get()) + self.assertTrue(await self.ref.compare_and_set(42, "hey")) + self.assertEqual("hey", await self.ref.get()) + + async def test_compare_and_set_when_condition_is_not_met(self): + self.assertFalse(await self.ref.compare_and_set(42, 23)) + self.assertIsNone(await self.ref.get()) + await self.ref.set("a") + self.assertFalse(await self.ref.compare_and_set("b", "c")) + self.assertEqual("a", await self.ref.get()) + + async def test_get(self): + await self.ref.set([1, 2, 3]) + self.assertEqual([1, 2, 3], await self.ref.get()) + + async def test_set(self): + self.assertIsNone(await self.ref.set("abc")) + self.assertEqual("abc", await self.ref.get()) + self.assertIsNone(await self.ref.set(["another_type", 1])) + self.assertEqual(["another_type", 1], await self.ref.get()) + + async def test_get_and_set(self): + self.assertIsNone(await self.ref.get_and_set("42")) + self.assertEqual("42", await self.ref.get()) + self.assertEqual("42", await self.ref.get_and_set(42)) + self.assertEqual(42, await self.ref.get()) + + async def test_is_none(self): + self.assertTrue(await self.ref.is_none()) + await self.ref.set(11) + self.assertFalse(await self.ref.is_none()) + await self.ref.set(None) + self.assertTrue(await self.ref.is_none()) + + async def test_clear(self): + self.assertIsNone(await self.ref.clear()) + self.assertIsNone(await self.ref.get()) + await self.ref.set("str") + self.assertEqual("str", await self.ref.get()) + self.assertIsNone(await self.ref.clear()) + self.assertIsNone(await self.ref.get()) + + async def test_contains(self): + self.assertTrue(await self.ref.contains(None)) + await self.ref.set("42") + self.assertTrue(await self.ref.contains("42")) + self.assertFalse(await self.ref.contains(42)) + self.assertFalse(await self.ref.contains(None)) + await self.ref.clear() + self.assertFalse(await self.ref.contains("42")) + self.assertTrue(await self.ref.contains(None)) + + async def test_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("hey") + self.assertIsNone(await self.ref.alter(AppendString("123"))) + self.assertEqual("hey123", await self.ref.get()) + + async def test_alter_with_incompatible_types(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set(42) + + try: + await self.ref.alter(AppendString(".")) + except ClassCastError: + pass + else: + self.fail("expected ClassCastError to be raised") + + async def test_alter_and_get(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("123") + self.assertEqual("123...", await self.ref.alter_and_get(AppendString("..."))) + self.assertEqual("123...", await self.ref.get()) + + async def test_get_and_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("hell") + self.assertEqual("hell", await self.ref.get_and_alter(AppendString("o"))) + self.assertEqual("hello", await self.ref.get()) + + async def test_apply(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("hell") + self.assertEqual("hello", await self.ref.apply(AppendString("o"))) + self.assertEqual("hell", await self.ref.get()) From 4de45d196fd9ed39ecb80a3aa32a69175ce6f498 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 11:50:35 +0300 Subject: [PATCH 03/12] Added CountdownLatch for Asyncio --- .../internal/asyncio_proxy/countdown_latch.py | 138 +++++++++++++++ .../internal/asyncio_proxy/cp_manager.py | 25 ++- .../asyncio/proxy/countdown_latch_test.py | 159 ++++++++++++++++++ 3 files changed, 321 insertions(+), 1 deletion(-) create mode 100644 hazelcast/internal/asyncio_proxy/countdown_latch.py create mode 100644 tests/integration/asyncio/proxy/countdown_latch_test.py diff --git a/hazelcast/internal/asyncio_proxy/countdown_latch.py b/hazelcast/internal/asyncio_proxy/countdown_latch.py new file mode 100644 index 0000000000..3d8236eff0 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/countdown_latch.py @@ -0,0 +1,138 @@ +import uuid + +from hazelcast.errors import OperationTimeoutError +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \ + count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec +from hazelcast.util import check_is_number, to_millis, check_is_int, check_true + + +class CountDownLatch(BaseCPProxy): + """A distributed, concurrent countdown latch data structure. + + CountDownLatch is a cluster-wide synchronization aid + that allows one or more callers to wait until a set of operations being + performed in other callers completes. + + CountDownLatch count can be reset using ``try_set_count()`` method after + a countdown has finished but not during an active count. This allows + the same latch instance to be reused. + + There is no ``await_latch()`` method to wait indefinitely since this is + undesirable in a distributed application: for example, a cluster can split + or the master and replicas could all terminate. In most cases, it is best + to configure an explicit timeout, so you have the ability to deal with + these situations. + + All the API methods in the CountDownLatch offer the exactly-once + execution semantics. For instance, even if a ``count_down()`` call is + internally retried because of crashed Hazelcast member, the counter + value is decremented only once. + """ + + async def await_latch(self, timeout: float) -> bool: + """Causes the current thread to wait until the latch has counted down to + zero, or an exception is thrown, or the specified waiting time elapses. + + If the current count is zero then this method returns ``True``. + + If the current count is greater than zero, then the current + thread becomes disabled for thread scheduling purposes and lies + dormant until one of the following things happen: + + - The count reaches zero due to invocations of the ``count_down()`` + method + - This CountDownLatch instance is destroyed + - The countdown owner becomes disconnected + - The specified waiting time elapses + + If the count reaches zero, then the method returns with the + value ``True``. + + If the specified waiting time elapses then the value ``False`` + is returned. If the time is less than or equal to zero, the method + will not wait at all. + + Args: + timeout: The maximum time to wait in seconds + + Returns: + ``True`` if the count reached zero, ``False`` if the waiting time + elapsed before the count reached zero + Raises: + IllegalStateError: If the Hazelcast instance was shut down while + waiting. + """ + check_is_number(timeout) + timeout = max(0.0, timeout) + invocation_uuid = uuid.uuid4() + codec = count_down_latch_await_codec + request = codec.encode_request( + self._group_id, self._object_name, invocation_uuid, to_millis(timeout) + ) + return await self._ainvoke(request, codec.decode_response) + + async def count_down(self) -> None: + """Decrements the count of the latch, releasing all waiting threads if + the count reaches zero. + + If the current count is greater than zero, then it is decremented. + If the new count is zero: + + - All waiting threads are re-enabled for thread scheduling purposes + - Countdown owner is set to ``None``. + + If the current count equals zero, then nothing happens. + """ + invocation_uuid = uuid.uuid4() + res = await self._get_round() + return await self._do_count_down(res, invocation_uuid) + + async def get_count(self) -> int: + """Returns the current count. + + Returns: + The current count. + """ + codec = count_down_latch_get_count_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def try_set_count(self, count: int) -> bool: + """Sets the count to the given value if the current count is zero. + + If count is not zero, then this method does nothing and returns + ``False``. + + Args: + count: The number of times ``count_down()`` must be invoked before + callers can pass through ``await_latch()``. + + Returns: + ``True`` if the new count was set, ``False`` if the current count + is not zero. + """ + check_is_int(count) + check_true(count > 0, "Count must be positive") + codec = count_down_latch_try_set_count_codec + request = codec.encode_request(self._group_id, self._object_name, count) + return await self._ainvoke(request, codec.decode_response) + + async def _do_count_down(self, expected_round, invocation_uuid): + try: + return await self._request_count_down(expected_round, invocation_uuid) + except OperationTimeoutError: + # we can retry safely because the retry is idempotent + return await self._do_count_down(expected_round, invocation_uuid) + + async def _get_round(self): + codec = count_down_latch_get_round_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def _request_count_down(self, expected_round, invocation_uuid): + codec = count_down_latch_count_down_codec + request = codec.encode_request( + self._group_id, self._object_name, invocation_uuid, expected_round + ) + return await self._ainvoke(request) diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 4141ec45c7..39d36ae525 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -2,11 +2,12 @@ _without_default_group_name, _get_object_name_for_proxy, ATOMIC_LONG_SERVICE, - ATOMIC_REFERENCE_SERVICE, + ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference +from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch from hazelcast.protocol.codec import cp_group_create_cp_group_codec @@ -78,6 +79,26 @@ async def get_atomic_reference(self, name: str) -> AtomicReference: """ return await self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name) + async def get_count_down_latch(self, name: str) -> CountDownLatch: + """Returns the distributed CountDownLatch instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + CountDownLatch instance will be created on the DEFAULT CP group. + If a group name is given, like + ``.get_count_down_latch("myLatch@group1")``, the given group will be + initialized first, if not initialized already, and then the instance + will be created on this group. + + Args: + name: Name of the CountDownLatch. + + Returns: + The CountDownLatch proxy for the given name. + """ + return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + class CPProxyManager: def __init__(self, context): @@ -92,6 +113,8 @@ async def get_or_create(self, service_name, proxy_name): return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) elif service_name == ATOMIC_REFERENCE_SERVICE: return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == COUNT_DOWN_LATCH_SERVICE: + return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) diff --git a/tests/integration/asyncio/proxy/countdown_latch_test.py b/tests/integration/asyncio/proxy/countdown_latch_test.py new file mode 100644 index 0000000000..8aac90428f --- /dev/null +++ b/tests/integration/asyncio/proxy/countdown_latch_test.py @@ -0,0 +1,159 @@ +import asyncio +import os + +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, OperationTimeoutError +from hazelcast.util import AtomicInteger +from tests.integration.asyncio.base import CPTestCase +from tests.util import random_string, get_current_timestamp + +inf = 2**31 - 1 + + +@pytest.mark.enterprise +class CountDownLatchTest(CPTestCase): + + async def test_latch_in_another_group(self): + latch = await self.get_latch() + another_latch = await self.client.cp_subsystem.get_count_down_latch( + latch._proxy_name + "@another" + ) + await another_latch.try_set_count(42) + self.assertEqual(42, await another_latch.get_count()) + self.assertNotEqual(42, await latch.get_count()) + + async def test_use_after_destroy(self): + latch = await self.get_latch() + await latch.destroy() + # the next destroy call should be ignored + await latch.destroy() + + try: + await latch.get_count() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + latch2 = await self.client.cp_subsystem.get_count_down_latch(latch._proxy_name) + + try: + await latch2.get_count() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_await_latch_negative_timeout(self): + latch = await self.get_latch(1) + self.assertFalse(await latch.await_latch(-1)) + + async def test_await_latch_zero_timeout(self): + latch = await self.get_latch(1) + self.assertFalse(await latch.await_latch(0)) + + async def test_await_latch_with_timeout(self): + timeout = 1 + latch = await self.get_latch(1) + start = get_current_timestamp() + self.assertFalse(await latch.await_latch(timeout)) + time_passed = get_current_timestamp() - start + expected_time_passed = timeout + if os.name == "nt": + # On Windows, we were getting random test failures due to expected + # time passed being slightly less than the timeout. This is due to + # the low time resolution there (15-16ms). If we are on Windows, we + # lower our expectations and settle for a slightly lower value. + expected_time_passed *= 0.95 + + self.assertTrue( + time_passed >= expected_time_passed, + "Time passed is less than %s, which is %s" % (expected_time_passed, time_passed), + ) + + async def test_await_latch_multiple_waiters(self): + latch = await self.get_latch(1) + # TODO: replace the following with the asyncio variant when implemented + completed = AtomicInteger() + + async def run(): + await latch.await_latch(inf) + completed.get_and_increment() + + count = 10 + tasks = [] + for _ in range(count): + tasks.append(asyncio.create_task(run())) + + await latch.count_down() + + def assertion(): + self.assertEqual(count, completed.get()) + + await self.assertTrueEventually(assertion) + + async def test_await_latch_response_on_count_down(self): + latch = await self.get_latch() + self.assertTrue(await latch.await_latch(inf)) + self.assertTrue(await latch.try_set_count(1)) + # make a non-blocking request + future = asyncio.create_task(latch.await_latch(inf)) + asyncio.create_task(latch.count_down()) + self.assertTrue(await future) + + async def test_count_down(self): + latch = await self.get_latch(10) + + for i in range(9, -1, -1): + self.assertIsNone(await latch.count_down()) + self.assertEqual(i, await latch.get_count()) + + async def test_count_down_retry_on_timeout(self): + latch = await self.get_latch(1) + original = latch._request_count_down + # TODO: replace the following with the asyncio variant when implemented + called_count = AtomicInteger() + + async def mock(expected_round, invocation_uuid): + if called_count.get_and_increment() < 2: + raise OperationTimeoutError("xx") + return await original(expected_round, invocation_uuid) + + latch._request_count_down = mock + await latch.count_down() + # Will resolve on it's third call. First 2 throws timeout error + self.assertEqual(3, called_count.get()) + self.assertEqual(0, await latch.get_count()) + + async def test_get_count(self): + latch = await self.get_latch(1) + self.assertEqual(1, await latch.get_count()) + await latch.count_down() + self.assertEqual(0, await latch.get_count()) + await latch.try_set_count(10) + self.assertEqual(10, await latch.get_count()) + + async def test_try_set_count(self): + latch = await self.get_latch() + self.assertTrue(await latch.try_set_count(3)) + self.assertEqual(3, await latch.get_count()) + + async def test_try_set_count_when_count_is_already_set(self): + latch = await self.get_latch(1) + self.assertFalse(await latch.try_set_count(10)) + self.assertFalse(await latch.try_set_count(20)) + self.assertEqual(1, await latch.get_count()) + + async def test_try_set_count_when_count_goes_to_zero(self): + latch = await self.get_latch(1) + await latch.count_down() + self.assertEqual(0, await latch.get_count()) + self.assertTrue(await latch.try_set_count(3)) + self.assertEqual(3, await latch.get_count()) + + async def get_latch(self, initial_count=None): + latch = await self.client.cp_subsystem.get_count_down_latch("latch-" + random_string()) + if initial_count is not None: + self.assertTrue(await latch.try_set_count(initial_count)) + return latch From aac1fc3cf8573217f531c19fa12ac04ed78fe147 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 11:51:17 +0300 Subject: [PATCH 04/12] black --- hazelcast/internal/asyncio_proxy/countdown_latch.py | 9 +++++++-- hazelcast/internal/asyncio_proxy/cp_manager.py | 3 ++- tests/integration/asyncio/proxy/countdown_latch_test.py | 1 - 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/countdown_latch.py b/hazelcast/internal/asyncio_proxy/countdown_latch.py index 3d8236eff0..28a7956adf 100644 --- a/hazelcast/internal/asyncio_proxy/countdown_latch.py +++ b/hazelcast/internal/asyncio_proxy/countdown_latch.py @@ -2,8 +2,13 @@ from hazelcast.errors import OperationTimeoutError from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy -from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \ - count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec +from hazelcast.protocol.codec import ( + count_down_latch_await_codec, + count_down_latch_get_round_codec, + count_down_latch_count_down_codec, + count_down_latch_get_count_codec, + count_down_latch_try_set_count_codec, +) from hazelcast.util import check_is_number, to_millis, check_is_int, check_true diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 39d36ae525..3749d40ea1 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -2,7 +2,8 @@ _without_default_group_name, _get_object_name_for_proxy, ATOMIC_LONG_SERVICE, - ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, + ATOMIC_REFERENCE_SERVICE, + COUNT_DOWN_LATCH_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong diff --git a/tests/integration/asyncio/proxy/countdown_latch_test.py b/tests/integration/asyncio/proxy/countdown_latch_test.py index 8aac90428f..74ec5f11bb 100644 --- a/tests/integration/asyncio/proxy/countdown_latch_test.py +++ b/tests/integration/asyncio/proxy/countdown_latch_test.py @@ -13,7 +13,6 @@ @pytest.mark.enterprise class CountDownLatchTest(CPTestCase): - async def test_latch_in_another_group(self): latch = await self.get_latch() another_latch = await self.client.cp_subsystem.get_count_down_latch( From d3b6199c7797c48d58736f40e2ec4088a0a7f8e6 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 11:54:52 +0300 Subject: [PATCH 05/12] CountdownLatch in public API --- hazelcast/asyncio/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index ba675e0c2e..e18bc367cb 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -1,6 +1,7 @@ __all__ = [ "AtomicLong", "CPSubsystem", + "CountDownLatch", "EntryEventCallable", "Executor", "HazelcastClient", @@ -31,3 +32,4 @@ from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch From 90190948032a6e9f9313b1e12528fe439bb96cfc Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 12:10:46 +0300 Subject: [PATCH 06/12] AtomicReference in public API --- hazelcast/asyncio/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index ba675e0c2e..89b0cc629c 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -1,5 +1,6 @@ __all__ = [ "AtomicLong", + "AtomicReference", "CPSubsystem", "EntryEventCallable", "Executor", @@ -31,3 +32,4 @@ from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference \ No newline at end of file From 6c6b6c8e9f7c65cadbb9890a593ebcc2b2c87695 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 12:14:02 +0300 Subject: [PATCH 07/12] black --- hazelcast/asyncio/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 89b0cc629c..cc94195cc0 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -32,4 +32,4 @@ from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong -from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference \ No newline at end of file +from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference From 18a4c4dc333c49f1dd660eac160c50b5a40ae9f3 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 14:53:24 +0300 Subject: [PATCH 08/12] review comment --- hazelcast/internal/asyncio_proxy/atomic_long.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/internal/asyncio_proxy/atomic_long.py b/hazelcast/internal/asyncio_proxy/atomic_long.py index 664b097ef7..7bf8190ddc 100644 --- a/hazelcast/internal/asyncio_proxy/atomic_long.py +++ b/hazelcast/internal/asyncio_proxy/atomic_long.py @@ -244,7 +244,7 @@ async def apply(self, function: typing.Any) -> typing.Any: try: function_data = self._to_data(function) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.apply, function) + return await self._send_schema_and_retry(e, self.apply, function) codec = atomic_long_apply_codec request = codec.encode_request(self._group_id, self._object_name, function_data) From 4a9126a5045a6ab3a9f2d2f49fadabb9eedc61ea Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Fri, 5 Jun 2026 10:08:10 +0300 Subject: [PATCH 09/12] Ported Semaphore to asyncio --- hazelcast/asyncio/__init__.py | 2 + hazelcast/internal/asyncio_client.py | 7 + hazelcast/internal/asyncio_proxy/cp.py | 197 ++++++- .../internal/asyncio_proxy/cp_manager.py | 41 +- hazelcast/internal/asyncio_proxy/semaphore.py | 539 ++++++++++++++++++ .../asyncio/proxy/semaphore_test.py | 271 +++++++++ 6 files changed, 1055 insertions(+), 2 deletions(-) create mode 100644 hazelcast/internal/asyncio_proxy/semaphore.py create mode 100644 tests/integration/asyncio/proxy/semaphore_test.py diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 98bb081f28..0cdefdc6e5 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -15,6 +15,7 @@ "ReliableTopic", "ReplicatedMap", "Ringbuffer", + "Semaphore", "Set", "VectorCollection", ] @@ -35,3 +36,4 @@ from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch +from hazelcast.internal.asyncio_proxy.semaphore import Semaphore diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index c266a42a59..e1fdd0cfce 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -11,6 +11,7 @@ from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation +from hazelcast.internal.asyncio_proxy.cp import ProxySessionManager from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection @@ -209,6 +210,7 @@ def __init__(self, config: Config | None = None, **kwargs): ) self._proxy_manager = ProxyManager(self._context) self._cp_subsystem = CPSubsystem(self._context) + self._proxy_session_manager = ProxySessionManager(self._context) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics( self, @@ -248,6 +250,7 @@ def _init_context(self): self._near_cache_manager, self._lock_reference_id_generator, self._name, + self._proxy_session_manager, self._reactor, self._compact_schema_service, ) @@ -493,6 +496,7 @@ async def shutdown(self) -> None: if self._internal_lifecycle_service.running: self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN) self._internal_lifecycle_service.shutdown() + await self._proxy_session_manager.shutdown() self._near_cache_manager.destroy_near_caches() await self._connection_manager.shutdown() self._invocation_service.shutdown() @@ -590,6 +594,7 @@ def __init__(self): self.near_cache_manager = None self.lock_reference_id_generator = None self.name = None + self.proxy_session_manager = None self.reactor = None self.compact_schema_service = None @@ -607,6 +612,7 @@ def init_context( near_cache_manager, lock_reference_id_generator, name, + proxy_session_manager, reactor, compact_schema_service, ): @@ -622,5 +628,6 @@ def init_context( self.near_cache_manager = near_cache_manager self.lock_reference_id_generator = lock_reference_id_generator self.name = name + self.proxy_session_manager = proxy_session_manager self.reactor = reactor self.compact_schema_service = compact_schema_service diff --git a/hazelcast/internal/asyncio_proxy/cp.py b/hazelcast/internal/asyncio_proxy/cp.py index 333500a641..0138ebcbe6 100644 --- a/hazelcast/internal/asyncio_proxy/cp.py +++ b/hazelcast/internal/asyncio_proxy/cp.py @@ -1,5 +1,21 @@ +import abc +import asyncio + +from hazelcast.cp import _SessionState +from hazelcast.errors import ( + HazelcastClientNotActiveError, + SessionExpiredError, + CPGroupDestroyedError, +) from hazelcast.internal.asyncio_invocation import Invocation -from hazelcast.protocol.codec import cp_group_destroy_cp_object_codec +from hazelcast.protocol import RaftGroupId +from hazelcast.protocol.codec import ( + cp_group_destroy_cp_object_codec, + cp_session_generate_thread_id_codec, + cp_session_close_session_codec, + cp_session_create_session_codec, + cp_session_heartbeat_session_codec, +) def _no_op_response_handler(_): @@ -32,3 +48,182 @@ def _invoke(self, request, response_handler=_no_op_response_handler): async def _ainvoke(self, request, response_handler=_no_op_response_handler): fut = self._invoke(request, response_handler) return await fut + + +class SessionAwareCPProxy(BaseCPProxy, abc.ABC): + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(SessionAwareCPProxy, self).__init__( + context, group_id, service_name, proxy_name, object_name + ) + self._session_manager = context.proxy_session_manager + + def get_group_id(self) -> RaftGroupId: + """ + Returns: + Id of the CP group that runs this proxy. + """ + return self._group_id + + def _get_session_id(self) -> int: + return self._session_manager.get_session_id(self._group_id) + + async def _acquire_session(self, count: int = 1) -> int: + return await self._session_manager.acquire_session(self._group_id, count) + + def _release_session(self, session_id: int, count: int = 1) -> None: + self._session_manager.release_session(self._group_id, session_id, count) + + def _invalidate_session(self, session_id: int) -> None: + self._session_manager.invalidate_session(self._group_id, session_id) + + +_NO_SESSION_ID = -1 + + +class ProxySessionManager: + def __init__(self, context): + self._context = context + self._mutexes = dict() # RaftGroupId to asyncio.Lock + self._sessions = dict() # RaftGroupId to SessionState + self._thread_ids = dict() # (RaftGroupId, thread_id) to global thread id + self._heartbeat_task = None + self._shutdown = False + self._lock = asyncio.Lock() + + def get_session_id(self, group_id): + session = self._sessions.get(group_id, None) + if session is None: + return _NO_SESSION_ID + return session.id + + async def acquire_session(self, group_id, count): + state = await self._get_or_create_session(group_id) + return state.acquire(count) + + def release_session(self, group_id, session_id, count): + session = self._sessions.get(group_id, None) + if session and session.id == session_id: + session.release(count) + + def invalidate_session(self, group_id, session_id): + session = self._sessions.get(group_id, None) + if session and session.id == session_id: + self._sessions.pop(group_id, None) + + async def get_or_create_unique_thread_id(self, group_id): + async with self._lock: + if self._shutdown: + raise HazelcastClientNotActiveError("Session manager is already shut down!") + + # TODO: replace 0 with the lock context once implemented + key = (group_id, 0) + global_thread_id = self._thread_ids.get(key) + if global_thread_id: + return global_thread_id + + tid = await self._request_generate_thread_id(group_id) + return self._thread_ids.setdefault(key, tid) + + async def shutdown(self): + async with self._lock: + if self._shutdown: + return None + + self._shutdown = True + if self._heartbeat_task: + self._heartbeat_task.cancel() + + tasks = [] + async with asyncio.TaskGroup() as tg: + for session in list(self._sessions.values()): + tasks.append( + tg.create_task(self._request_close_session(session.group_id, session.id)) + ) + + self._sessions.clear() + self._mutexes.clear() + self._thread_ids.clear() + + async def _request_generate_thread_id(self, group_id): + codec = cp_session_generate_thread_id_codec + request = codec.encode_request(group_id) + invocation = Invocation(request, response_handler=codec.decode_response) + return await self._context.invocation_service.ainvoke(invocation) + + async def _request_close_session(self, group_id, session_id): + codec = cp_session_close_session_codec + request = codec.encode_request(group_id, session_id) + invocation = Invocation(request, response_handler=codec.decode_response) + return await self._context.invocation_service.ainvoke(invocation) + + async def _get_or_create_session(self, group_id): + async with self._lock: + if self._shutdown: + raise HazelcastClientNotActiveError("Session manager is already shut down!") + + session = self._sessions.get(group_id, None) + if session is None or not session.is_valid(): + async with self._mutex(group_id): + session = self._sessions.get(group_id) + if session is None or not session.is_valid(): + return await self._create_new_session(group_id) + return session + + async def _create_new_session(self, group_id): + response = await self._request_new_session(group_id) + return self._do_create_new_session(response, group_id) + + def _do_create_new_session(self, response, group_id): + session = _SessionState(response["session_id"], group_id, response["ttl_millis"] / 1000.0) + self._sessions[group_id] = session + self._start_heartbeat_timer(response["heartbeat_millis"] / 1000.0) + return session + + async def _request_new_session(self, group_id): + codec = cp_session_create_session_codec + request = codec.encode_request(group_id, self._context.name) + invocation = Invocation(request, response_handler=codec.decode_response) + return await self._context.invocation_service.ainvoke(invocation) + + def _mutex(self, group_id) -> asyncio.Lock: + mutex = self._mutexes.get(group_id, None) + if mutex is not None: + return mutex + + mutex = asyncio.Lock() + current = self._mutexes.setdefault(group_id, mutex) + return current + + def _start_heartbeat_timer(self, period): + if self._heartbeat_task is not None: + return + + async def heartbeat(): + await asyncio.sleep(period) + if self._shutdown: + return + + for session in list(self._sessions.values()): + if session.is_in_use(): + + def cb(heartbeat_future: asyncio.Future, session=session): + error = heartbeat_future.exception() + if error is None: + return + + if isinstance(error, (SessionExpiredError, CPGroupDestroyedError)): + self.invalidate_session(session.group_id, session.id) + + f = self._request_heartbeat(session.group_id, session.id) + f.add_done_callback(cb) + + self._heartbeat_task = asyncio.create_task(heartbeat()) + + self._heartbeat_task = asyncio.create_task(heartbeat()) + + def _request_heartbeat(self, group_id, session_id) -> asyncio.Future: + codec = cp_session_heartbeat_session_codec + request = codec.encode_request(group_id, session_id) + invocation = Invocation(request) + self._context.invocation_service.invoke(invocation) + return invocation.future diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 3749d40ea1..1cd5440860 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -4,12 +4,21 @@ ATOMIC_LONG_SERVICE, ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, + SEMAPHORE_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch -from hazelcast.protocol.codec import cp_group_create_cp_group_codec +from hazelcast.internal.asyncio_proxy.semaphore import ( + Semaphore, + SessionAwareSemaphore, + SessionlessSemaphore, +) +from hazelcast.protocol.codec import ( + cp_group_create_cp_group_codec, + semaphore_get_semaphore_type_codec, +) class CPSubsystem: @@ -100,6 +109,25 @@ async def get_count_down_latch(self, name: str) -> CountDownLatch: """ return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + async def get_semaphore(self, name: str) -> Semaphore: + """Returns the distributed Semaphore instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + Semaphore instance will be created on the DEFAULT CP group. + If a group name is given, like ``.get_semaphore("mySemaphore@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name: Name of the Semaphore + + Returns: + The Semaphore proxy for the given name. + """ + return await self._proxy_manager.get_or_create(SEMAPHORE_SERVICE, name) + class CPProxyManager: def __init__(self, context): @@ -116,9 +144,20 @@ async def get_or_create(self, service_name, proxy_name): return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) elif service_name == COUNT_DOWN_LATCH_SERVICE: return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == SEMAPHORE_SERVICE: + return await self._create_semaphore(group_id, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) + async def _create_semaphore(self, group_id, proxy_name, object_name): + codec = semaphore_get_semaphore_type_codec + request = codec.encode_request(proxy_name) + invocation = Invocation(request, response_handler=codec.decode_response) + invocation_service = self._context.invocation_service + jdk_compatible = await invocation_service.ainvoke(invocation) + kls = SessionlessSemaphore if jdk_compatible else SessionAwareSemaphore + return kls(self._context, group_id, SEMAPHORE_SERVICE, proxy_name, object_name) + async def _get_group_id(self, proxy_name): codec = cp_group_create_cp_group_codec request = codec.encode_request(proxy_name) diff --git a/hazelcast/internal/asyncio_proxy/semaphore.py b/hazelcast/internal/asyncio_proxy/semaphore.py new file mode 100644 index 0000000000..a67925f44b --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/semaphore.py @@ -0,0 +1,539 @@ +import time +import uuid + +from hazelcast.errors import IllegalStateError, SessionExpiredError, WaitKeyCancelledError +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy, SessionAwareCPProxy, _NO_SESSION_ID +from hazelcast.protocol.codec import ( + semaphore_init_codec, + semaphore_available_permits_codec, + semaphore_acquire_codec, + semaphore_change_codec, + semaphore_drain_codec, + semaphore_release_codec, +) +from hazelcast.util import check_not_negative, check_true, to_millis + +# Since a proxy does not know how many permits will be drained on +# the Raft group, it uses this constant to increment its local session +# acquire count. Then, it adjusts the local session acquire count after +# the drain response is returned. +_DRAIN_SESSION_ACQ_COUNT = 1024 + + +class Semaphore(BaseCPProxy): + """A linearizable, distributed semaphore. + + Semaphores are often used to restrict the number of callers that can access + some physical or logical resource. + + Semaphore is a cluster-wide counting semaphore. Conceptually, it maintains + a set of permits. Each ``acquire()`` blocks if necessary until a permit + is available, and then takes it. Dually, each ``release()`` adds a + permit, potentially releasing a blocking acquirer. However, no actual permit + objects are used; the semaphore just keeps a count of the number available + and acts accordingly. + + Hazelcast's distributed semaphore implementation guarantees that callers + invoking any of the ``acquire()`` methods are selected to + obtain permits in the order of their invocations (first-in-first-out; FIFO). + Note that FIFO ordering implies the order which the primary replica of an + Semaphore receives these acquire requests. Therefore, it is + possible for one member to invoke ``acquire()`` before another member, + but its request hits the primary replica after the other member. + + This class also provides convenient ways to work with multiple permits at + once. Beware of the increased risk of indefinite postponement when using the + multiple-permit acquire. If permits are released one by one, a caller + waiting for one permit will acquire it before a caller waiting for multiple + permits regardless of the call order. + + Correct usage of a semaphore is established by programming convention + in the application. + + It works on top of the Raft consensus algorithm. It offers linearizability + during crash failures and network partitions. It is CP with respect to the + CAP principle. If a network partition occurs, it remains available on at + most one side of the partition. + + It has 2 variations: + + - The default implementation accessed via ``cp_subsystem`` is session-aware. + In this one, when a caller makes its very first ``acquire()`` call, it + starts a new CP session with the underlying CP group. Then, liveliness of + the caller is tracked via this CP session. When the caller fails, permits + acquired by this caller are automatically and safely released. However, + the session-aware version comes with a limitation, that is, a client + cannot release permits before acquiring them first. In other words, a + client can release only the permits it has acquired earlier. It means, you + can acquire a permit from one thread and release it from another thread + using the same Hazelcast client, but not different instances of Hazelcast + client. You can use the session-aware CP Semaphore implementation by + disabling JDK compatibility via ``jdk-compatible`` server-side setting. + Although the session-aware implementation has a minor difference to the + JDK Semaphore, we think it is a better fit for distributed environments + because of its safe auto-cleanup mechanism for acquired permits. + - The second implementation offered by ``cp_subsystem`` is sessionless. This + implementation does not perform auto-cleanup of acquired permits on + failures. Acquired permits are not bound to threads and permits can be + released without acquiring first. However, you need to handle failed + permit owners on your own. If a Hazelcast server or a client fails while + holding some permits, they will not be automatically released. You can + use the sessionless CP Semaphore implementation by enabling JDK + compatibility via ``jdk-compatible`` server-side setting. + + There is a subtle difference between the lock and semaphore abstractions. + A lock can be assigned to at most one endpoint at a time, so we have a total + order among its holders. However, permits of a semaphore can be assigned to + multiple endpoints at a time, which implies that we may not have a total + order among permit holders. In fact, permit holders are partially ordered. + For this reason, the fencing token approach, which is explained in + :class:`~hazelcast.proxy.cp.fenced_lock.FencedLock`, does not work for the + semaphore abstraction. Moreover, each permit is an independent entity. + Multiple permit acquires and reentrant lock acquires of a single endpoint + are not equivalent. The only case where a semaphore behaves like a lock is + the binary case, where the semaphore has only 1 permit. In this case, the + semaphore works like a non-reentrant lock. + + All of the API methods in the new CP Semaphore implementation offer + the exactly-once execution semantics for the session-aware version. + For instance, even if a ``release()`` call is internally retried + because of a crashed Hazelcast member, the permit is released only once. + However, this guarantee is not given for the sessionless, a.k.a, + JDK-compatible CP Semaphore. + """ + + async def init(self, permits: int) -> bool: + """Tries to initialize this Semaphore instance with the given permit + count. + + Args: + permits: The given permit count. + + Returns: + ``True`` if the initialization succeeds, ``False`` if already + initialized. + + Raises: + AssertionError: If the ``permits`` is negative. + """ + check_not_negative(permits, "Permits must be non-negative") + codec = semaphore_init_codec + request = codec.encode_request(self._group_id, self._object_name, permits) + return await self._ainvoke(request, codec.decode_response) + + async def acquire(self, permits: int = 1) -> None: + """Acquires the given number of permits if they are available, + and returns immediately, reducing the number of available permits + by the given amount. + + If insufficient permits are available then the result of the returned + future is not set until one of the following things happens: + + - Some other caller invokes one of the ``release`` + methods for this semaphore, the current caller is next to be assigned + permits and the number of available permits satisfies this request, + - This Semaphore instance is destroyed + + Args: + permits: Optional number of permits to acquire; defaults to ``1`` + when not specified + + Raises: + AssertionError: If the ``permits`` is not positive. + """ + raise NotImplementedError("acquire") + + async def available_permits(self) -> int: + """Returns the current number of permits currently available in this + semaphore. + + This method is typically used for debugging and testing purposes. + + Returns: + The number of permits available in this semaphore. + """ + codec = semaphore_available_permits_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def drain_permits(self) -> int: + """Acquires and returns all permits that are available at invocation + time. + + Returns: + The number of permits drained. + """ + raise NotImplementedError("drain_permits") + + async def reduce_permits(self, reduction: int) -> None: + """Reduces the number of available permits by the indicated amount. + + This method differs from ``acquire`` as it does not block until permits + become available. Similarly, if the caller has acquired some permits, + they are not released with this call. + + Args: + reduction: The number of permits to reduce. + + Raises: + AssertionError: If the ``reduction`` is negative. + """ + check_not_negative(reduction, "Reduction must be non-negative") + if reduction == 0: + return None + + return await self._do_change_permits(-reduction) + + async def increase_permits(self, increase: int) -> None: + """Increases the number of available permits by the indicated amount. + + If there are some callers waiting for permits to become available, they + will be notified. Moreover, if the caller has acquired some permits, + they are not released with this call. + + Args: + increase: The number of permits to increase. + + Raises: + AssertionError: If ``increase`` is negative. + """ + check_not_negative(increase, "Increase must be non-negative") + if increase == 0: + return None + + return await self._do_change_permits(increase) + + async def release(self, permits: int = 1) -> None: + """Releases the given number of permits and increases the number of + available permits by that amount. + + If some callers in the cluster are blocked for acquiring permits, + they will be notified. + + If the underlying Semaphore implementation is non-JDK-compatible + (configured via ``jdk-compatible`` server-side setting), then a + client can only release a permit which it has acquired before. + In other words, a client cannot release a permit without acquiring + it first. + + Otherwise, which means the underlying implementation is JDK compatible + (configured via ``jdk-compatible`` server-side setting), there is no + requirement that a client that releases a permit must have acquired + that permit by calling one of the ``acquire()`` methods. A client can + freely release a permit without acquiring it first. In this case, + correct usage of a semaphore is established by programming convention + in the application. + + Args: + permits: Optional number of permits to release; defaults to ``1`` + when not specified. + + Raises: + AssertionError: If the ``permits`` is not positive. + IllegalStateError: if the Semaphore is non-JDK-compatible and the + caller does not have a permit + """ + raise NotImplementedError("release") + + async def try_acquire(self, permits: int = 1, timeout: float = 0) -> bool: + """Acquires the given number of permits and returns ``True``, if they + become available during the given waiting time. + + If permits are acquired, the number of available permits in the + Semaphore instance is also reduced by the given amount. + + If no sufficient permits are available, then the result of the returned + future is not set until one of the following things happens: + + - Permits are released by other callers, the current caller is next to + be assigned permits and the number of available permits satisfies this + request + - The specified waiting time elapses + + Args: + permits: The number of permits to acquire; defaults to ``1`` when + not specified. + timeout: Optional timeout in seconds to wait for the permits; when + it's not specified the operation will return immediately after + the acquire attempt. + + Returns: + ``True`` if all permits were acquired, ``False`` if the waiting + time elapsed before all permits could be acquired + + Raises: + AssertionError: If the ``permits`` is not positive. + """ + raise NotImplementedError("try_acquire") + + async def _do_change_permits(self, permits): + raise NotImplementedError("_do_change_permits") + + +class SessionAwareSemaphore(Semaphore, SessionAwareCPProxy): + async def acquire(self, permits=1): + check_true(permits > 0, "Permits must be positive") + # TODO: replace 0 with the lock context once implemented: + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + await self._do_acquire(current_thread_id, invocation_uuid, permits) + + async def drain_permits(self): + # TODO: replace 0 with the lock context once implemented: + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + return await self._do_drain(current_thread_id, invocation_uuid) + + async def release(self, permits=1): + check_true(permits > 0, "Permits must be positive") + session_id = self._get_session_id() + if session_id == _NO_SESSION_ID: + raise self._new_illegal_state_error() + + # TODO: replace 0 with the lock context once implemented: + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + + try: + await self._request_release(session_id, current_thread_id, invocation_uuid, permits) + except SessionExpiredError as e: + self._invalidate_session(session_id) + raise self._new_illegal_state_error(e) + finally: + self._release_session(session_id, permits) + + async def try_acquire(self, permits=1, timeout=0): + check_true(permits > 0, "Permits must be positive") + timeout = max(0.0, timeout) + # TODO: replace 0 with the lock context once implemented: + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + return await self._do_try_acquire(current_thread_id, invocation_uuid, permits, timeout) + + async def _do_acquire(self, current_thread_id, invocation_uuid, permits): + async def do_acquire_once(session_id): + try: + await self._request_acquire( + session_id, current_thread_id, invocation_uuid, permits, -1 + ) + except SessionExpiredError: + self._invalidate_session(session_id) + return await self._do_acquire(current_thread_id, invocation_uuid, permits) + except WaitKeyCancelledError: + self._release_session(session_id, permits) + raise IllegalStateError( + 'Semaphore("%s") not acquired because the acquire call on the CP ' + "group is cancelled, possibly because of another indeterminate call " + "from the same thread." % self._object_name + ) + except Exception as e: + self._release_session(session_id, permits) + raise e + + session_id = await self._acquire_session(permits) + await do_acquire_once(session_id) + + async def _do_drain(self, current_thread_id, invocation_uuid): + async def do_drain_once(session_id): + try: + count = await self._request_drain(session_id, current_thread_id, invocation_uuid) + self._release_session(session_id, _DRAIN_SESSION_ACQ_COUNT - count) + return count + except SessionExpiredError: + self._invalidate_session(session_id) + return await self._do_drain(current_thread_id, invocation_uuid) + except Exception as e: + self._release_session(session_id, _DRAIN_SESSION_ACQ_COUNT) + raise e + + session_id = await self._acquire_session(_DRAIN_SESSION_ACQ_COUNT) + return await do_drain_once(session_id) + + async def _do_change_permits(self, delta): + # TODO: replace 0 with the lock context once implemented: + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + + async def do_change_permits_once(session_id): + try: + await self._request_change(session_id, current_thread_id, invocation_uuid, delta) + except SessionExpiredError as e: + self._invalidate_session(session_id) + raise self._new_illegal_state_error(e) + finally: + self._release_session(session_id) + + session_id = await self._acquire_session() + await do_change_permits_once(session_id) + + async def _do_try_acquire(self, current_thread_id, invocation_uuid, permits, timeout): + start = time.time() + + async def do_try_acquire_once(session_id): + try: + acquired = await self._request_acquire( + session_id, current_thread_id, invocation_uuid, permits, timeout + ) + if not acquired: + self._release_session(session_id, permits) + return acquired + except SessionExpiredError: + self._invalidate_session(session_id) + remaining_timeout = timeout - (time.time() - start) + if remaining_timeout <= 0: + return False + return self._do_try_acquire( + current_thread_id, invocation_uuid, permits, remaining_timeout + ) + except WaitKeyCancelledError: + self._release_session(session_id, permits) + return False + except Exception as e: + self._release_session(session_id, permits) + raise e + + session_id = await self._acquire_session(permits) + return await do_try_acquire_once(session_id) + + def _new_illegal_state_error(self, cause=None): + return IllegalStateError('Semaphore["%s"] has no valid session!' % self._object_name, cause) + + async def _request_acquire( + self, session_id, current_thread_id, invocation_uuid, permits, timeout + ): + codec = semaphore_acquire_codec + if timeout >= 0: + timeout = to_millis(timeout) + + request = codec.encode_request( + self._group_id, + self._object_name, + session_id, + current_thread_id, + invocation_uuid, + permits, + timeout, + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_drain(self, session_id, current_thread_id, invocation_uuid): + codec = semaphore_drain_codec + request = codec.encode_request( + self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_change(self, session_id, current_thread_id, invocation_uuid, delta): + codec = semaphore_change_codec + request = codec.encode_request( + self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid, delta + ) + return await self._ainvoke(request) + + async def _request_release(self, session_id, current_thread_id, invocation_uuid, permits): + codec = semaphore_release_codec + request = codec.encode_request( + self._group_id, + self._object_name, + session_id, + current_thread_id, + invocation_uuid, + permits, + ) + return await self._ainvoke(request) + + +class SessionlessSemaphore(Semaphore): + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(SessionlessSemaphore, self).__init__( + context, group_id, service_name, proxy_name, object_name + ) + self._session_manager = context.proxy_session_manager + + async def acquire(self, permits=1): + check_true(permits > 0, "Permits must be positive") + tid = await self._get_thread_id() + await self._do_try_acquire(tid, permits, -1) + + async def drain_permits(self): + tid = await self._get_thread_id() + return await self._do_drain_permits(tid) + + async def release(self, permits=1): + check_true(permits > 0, "Permits must be positive") + invocation_uuid = uuid.uuid4() + tid = await self._get_thread_id() + return await self._request_release(tid, invocation_uuid, permits) + + async def try_acquire(self, permits=1, timeout=0): + check_true(permits > 0, "Permits must be positive") + timeout = max(0.0, timeout) + tid = await self._get_thread_id() + return await self._do_try_acquire(tid, permits, timeout) + + async def _do_try_acquire(self, global_thread_id, permits, timeout): + invocation_uuid = uuid.uuid4() + try: + return await self._request_acquire(global_thread_id, invocation_uuid, permits, timeout) + except WaitKeyCancelledError: + raise IllegalStateError( + 'Semaphore("%s") not acquired because the acquire call on the ' + "CP group is cancelled, possibly because of another indeterminate " + "call from the same thread." % self._object_name + ) + + async def _do_drain_permits(self, global_thread_id): + invocation_uuid = uuid.uuid4() + codec = semaphore_drain_codec + request = codec.encode_request( + self._group_id, self._object_name, _NO_SESSION_ID, global_thread_id, invocation_uuid + ) + return await self._ainvoke(request, codec.decode_response) + + async def _do_change_permits(self, permits): + invocation_uuid = uuid.uuid4() + tid = await self._get_thread_id() + return await self._request_change(tid, invocation_uuid, permits) + + async def _request_acquire(self, global_thread_id, invocation_uuid, permits, timeout): + codec = semaphore_acquire_codec + if timeout >= 0: + timeout = to_millis(timeout) + + request = codec.encode_request( + self._group_id, + self._object_name, + _NO_SESSION_ID, + global_thread_id, + invocation_uuid, + permits, + timeout, + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_change(self, global_thread_id, invocation_uuid, permits): + codec = semaphore_change_codec + request = codec.encode_request( + self._group_id, + self._object_name, + _NO_SESSION_ID, + global_thread_id, + invocation_uuid, + permits, + ) + return await self._ainvoke(request) + + async def _request_release(self, global_thread_id, invocation_uuid, permits): + codec = semaphore_release_codec + request = codec.encode_request( + self._group_id, + self._object_name, + _NO_SESSION_ID, + global_thread_id, + invocation_uuid, + permits, + ) + return await self._ainvoke(request) + + async def _get_thread_id(self): + return await self._session_manager.get_or_create_unique_thread_id(self._group_id) diff --git a/tests/integration/asyncio/proxy/semaphore_test.py b/tests/integration/asyncio/proxy/semaphore_test.py new file mode 100644 index 0000000000..740cf4c9d5 --- /dev/null +++ b/tests/integration/asyncio/proxy/semaphore_test.py @@ -0,0 +1,271 @@ +import asyncio + +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, IllegalStateError +from hazelcast.internal.asyncio_client import HazelcastClient +from tests.integration.asyncio.base import CPTestCase +from tests.util import random_string, get_current_timestamp + +SEMAPHORE_TYPES = [ + "sessionless", + "sessionaware", +] + + +@pytest.mark.enterprise +class SemaphoreTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.semaphore = None + + async def asyncTearDown(self): + if self.semaphore: + self.semaphore.destroy() + await super().asyncTearDown() + + async def test_semaphore_in_another_group(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 1) + another_semaphore = await self.client.cp_subsystem.get_semaphore( + semaphore._proxy_name + "@another" + ) + self.assertEqual(1, await semaphore.available_permits()) + self.assertEqual(0, await another_semaphore.available_permits()) + await semaphore.acquire() + self.assertEqual(0, await semaphore.available_permits()) + self.assertEqual(0, await semaphore.available_permits()) + + async def test_use_after_destroy(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type) + await semaphore.destroy() + # the next destroy call should be ignored + await semaphore.destroy() + + try: + await semaphore.init(1) + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + semaphore2 = await self.client.cp_subsystem.get_semaphore(semaphore._proxy_name) + + try: + await semaphore2.init(1) + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_session_aware_semaphore_after_client_shutdown(self): + semaphore = await self.get_semaphore("sessionaware", 1) + another_client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + another_semaphore = await another_client.cp_subsystem.get_semaphore(semaphore._proxy_name) + await another_semaphore.acquire(1) + self.assertEqual(0, await another_semaphore.available_permits()) + self.assertEqual(0, await semaphore.available_permits()) + await another_client.shutdown() + + async def assertion(): + self.assertEqual(1, await semaphore.available_permits()) + + await self.assertTrueEventually(assertion) + + async def test_init(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type) + self.assertEqual(0, await semaphore.available_permits()) + self.assertTrue(await semaphore.init(10)) + self.assertEqual(10, await semaphore.available_permits()) + + async def test_init_when_already_initialized(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type) + self.assertTrue(await semaphore.init(5)) + self.assertFalse(await semaphore.init(7)) + self.assertEqual(5, await semaphore.available_permits()) + + async def test_acquire(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 42) + self.assertIsNone(await semaphore.acquire(2)) + self.assertEqual(40, await semaphore.available_permits()) + self.assertIsNone(await semaphore.acquire()) + self.assertEqual(39, await semaphore.available_permits()) + + async def test_acquire_when_not_enough_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 5) + f = asyncio.create_task(semaphore.acquire(10)) + self.assertFalse(f.done()) + await asyncio.sleep(2) + self.assertFalse(f.done()) + await semaphore.destroy() + + try: + await f + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + # TODO: Implement test_acquire_blocks_until_someone_releases after lock context is implemented + # TODO: test_acquire_blocks_until_semaphore_is_destroyed after lock context is implemented + + async def test_available_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type) + self.assertEqual(0, await semaphore.available_permits()) + await semaphore.init(5) + self.assertEqual(5, await semaphore.available_permits()) + await semaphore.acquire(3) + self.assertEqual(2, await semaphore.available_permits()) + + async def test_drain_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 20) + await semaphore.acquire(5) + self.assertEqual(15, await semaphore.drain_permits()) + self.assertEqual(0, await semaphore.available_permits()) + + async def test_drain_permits_when_no_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 0) + self.assertEqual(0, await semaphore.drain_permits()) + + async def test_reduce_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 10) + self.assertIsNone(await semaphore.reduce_permits(5)) + self.assertEqual(5, await semaphore.available_permits()) + self.assertIsNone(await semaphore.reduce_permits(0)) + self.assertEqual(5, await semaphore.available_permits()) + + async def test_reduce_permits_on_negative_permits_counter_sessionless(self): + semaphore = await self.get_semaphore("sessionless", 10) + await semaphore.reduce_permits(15) + self.assertEqual(-5, await semaphore.available_permits()) + await semaphore.release(10) + self.assertEqual(5, await semaphore.available_permits()) + + async def test_reduce_permits_on_negative_permits_counter_juc_sessionless(self): + semaphore = await self.get_semaphore("sessionless", 0) + await semaphore.reduce_permits(100) + await semaphore.release(10) + self.assertEqual(-90, await semaphore.available_permits()) + self.assertEqual(-90, await semaphore.drain_permits()) + await semaphore.release(10) + self.assertEqual(10, await semaphore.available_permits()) + self.assertEqual(10, await semaphore.drain_permits()) + + async def test_reduce_permits_on_negative_permits_counter_session_aware(self): + semaphore = await self.get_semaphore("sessionaware", 10) + await semaphore.reduce_permits(15) + self.assertEqual(-5, await semaphore.available_permits()) + + async def test_reduce_permits_on_negative_permits_counter_juc_session_aware(self): + semaphore = await self.get_semaphore("sessionaware", 0) + await semaphore.reduce_permits(100) + self.assertEqual(-100, await semaphore.available_permits()) + self.assertEqual(-100, await semaphore.drain_permits()) + + async def test_increase_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 10) + self.assertEqual(10, await semaphore.available_permits()) + self.assertIsNone(await semaphore.increase_permits(100)) + self.assertEqual(110, await semaphore.available_permits()) + self.assertIsNone(await semaphore.increase_permits(0)) + self.assertEqual(110, await semaphore.available_permits()) + + async def test_release(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 2) + await semaphore.acquire(2) + self.assertIsNone(await semaphore.release(2)) + self.assertEqual(2, await semaphore.available_permits()) + + async def test_release_when_acquired_by_another_client_sessionless(self): + semaphore = await self.get_semaphore("sessionless") + another_client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + another_semaphore = await another_client.cp_subsystem.get_semaphore(semaphore._proxy_name) + self.assertTrue(await another_semaphore.init(1)) + await another_semaphore.acquire() + + try: + await semaphore.release(1) + self.assertEqual(1, await semaphore.available_permits()) + finally: + await another_client.shutdown() + + async def test_release_when_not_acquired_session_aware(self): + semaphore = await self.get_semaphore("sessionaware", 3) + await semaphore.acquire(1) + + try: + await semaphore.release(2) + except IllegalStateError: + pass + else: + self.fail("expected IllegalStateError to be raised") + + async def test_release_when_there_is_no_session_session_aware(self): + semaphore = await self.get_semaphore("sessionaware", 3) + + try: + await semaphore.release() + except IllegalStateError: + pass + else: + self.fail("expected IllegalStateError to be raised") + + async def test_test_try_acquire(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 5) + self.assertTrue(await semaphore.try_acquire()) + self.assertEqual(4, await semaphore.available_permits()) + + async def test_try_acquire_with_given_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 5) + self.assertTrue(await semaphore.try_acquire(3)) + self.assertEqual(2, await semaphore.available_permits()) + + async def test_try_acquire_when_not_enough_permits(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 1) + self.assertFalse(await semaphore.try_acquire(2)) + self.assertEqual(1, await semaphore.available_permits()) + + async def test_try_acquire_when_not_enough_permits_with_timeout(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 1) + start = get_current_timestamp() + self.assertFalse(await semaphore.try_acquire(2, 1)) + self.assertGreaterEqual(get_current_timestamp() - start, 1) + self.assertEqual(1, await semaphore.available_permits()) + + async def get_semaphore(self, semaphore_type, initialize_with=None): + semaphore = await self.client.cp_subsystem.get_semaphore(semaphore_type + random_string()) + if initialize_with is not None: + await semaphore.init(initialize_with) + self.semaphore = semaphore + return semaphore From 8cd8554cccb883a4e1924617a771c751c883bee5 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Fri, 5 Jun 2026 17:05:08 +0300 Subject: [PATCH 10/12] Review comments --- hazelcast/internal/asyncio_proxy/semaphore.py | 2 +- tests/integration/asyncio/proxy/semaphore_test.py | 4 ++-- .../backward_compatible/proxy/cp/semaphore_test.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/semaphore.py b/hazelcast/internal/asyncio_proxy/semaphore.py index a67925f44b..6d46a41f41 100644 --- a/hazelcast/internal/asyncio_proxy/semaphore.py +++ b/hazelcast/internal/asyncio_proxy/semaphore.py @@ -382,7 +382,7 @@ async def do_try_acquire_once(session_id): remaining_timeout = timeout - (time.time() - start) if remaining_timeout <= 0: return False - return self._do_try_acquire( + return await self._do_try_acquire( current_thread_id, invocation_uuid, permits, remaining_timeout ) except WaitKeyCancelledError: diff --git a/tests/integration/asyncio/proxy/semaphore_test.py b/tests/integration/asyncio/proxy/semaphore_test.py index 740cf4c9d5..5d2e36c938 100644 --- a/tests/integration/asyncio/proxy/semaphore_test.py +++ b/tests/integration/asyncio/proxy/semaphore_test.py @@ -35,7 +35,7 @@ async def test_semaphore_in_another_group(self): self.assertEqual(0, await another_semaphore.available_permits()) await semaphore.acquire() self.assertEqual(0, await semaphore.available_permits()) - self.assertEqual(0, await semaphore.available_permits()) + self.assertEqual(0, await another_semaphore.available_permits()) async def test_use_after_destroy(self): for semaphore_type in SEMAPHORE_TYPES: @@ -233,7 +233,7 @@ async def test_release_when_there_is_no_session_session_aware(self): else: self.fail("expected IllegalStateError to be raised") - async def test_test_try_acquire(self): + async def test_try_acquire(self): for semaphore_type in SEMAPHORE_TYPES: with self.subTest(semaphore_type, semaphore_type=semaphore_type): semaphore = await self.get_semaphore(semaphore_type, 5) diff --git a/tests/integration/backward_compatible/proxy/cp/semaphore_test.py b/tests/integration/backward_compatible/proxy/cp/semaphore_test.py index 0c59ea633d..cb48cf2afc 100644 --- a/tests/integration/backward_compatible/proxy/cp/semaphore_test.py +++ b/tests/integration/backward_compatible/proxy/cp/semaphore_test.py @@ -38,7 +38,7 @@ def test_semaphore_in_another_group(self, semaphore_type): self.assertEqual(0, another_semaphore.available_permits()) semaphore.acquire() self.assertEqual(0, semaphore.available_permits()) - self.assertEqual(0, semaphore.available_permits()) + self.assertEqual(0, another_semaphore.available_permits()) @parameterized.expand(SEMAPHORE_TYPES) def test_use_after_destroy(self, semaphore_type): @@ -258,7 +258,7 @@ def test_release_when_there_is_no_session_session_aware(self): semaphore.release() @parameterized.expand(SEMAPHORE_TYPES) - def test_test_try_acquire(self, semaphore_type): + def test_try_acquire(self, semaphore_type): semaphore = self.get_semaphore(semaphore_type, 5) self.assertTrue(semaphore.try_acquire()) self.assertEqual(4, semaphore.available_permits()) From 520a38f9ed2d3be2ef82ab6bfd7c0cac8684609e Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 8 Jun 2026 10:27:04 +0300 Subject: [PATCH 11/12] Portede the FencedLock proxy to asyncio --- hazelcast/asyncio/__init__.py | 2 + .../internal/asyncio_proxy/cp_manager.py | 40 +++ .../internal/asyncio_proxy/fenced_lock.py | 270 ++++++++++++++++++ .../asyncio/proxy/fenced_lock_test.py | 214 ++++++++++++++ 4 files changed, 526 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/fenced_lock.py create mode 100644 tests/integration/asyncio/proxy/fenced_lock_test.py diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 0cdefdc6e5..21e60cb755 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -5,6 +5,7 @@ "CountDownLatch", "EntryEventCallable", "Executor", + "FencedLock", "HazelcastClient", "List", "Map", @@ -37,3 +38,4 @@ from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch from hazelcast.internal.asyncio_proxy.semaphore import Semaphore +from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 8dfa8c2eba..f306840936 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -1,3 +1,5 @@ +import asyncio + from hazelcast.cp import ( _without_default_group_name, _get_object_name_for_proxy, @@ -5,11 +7,13 @@ ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, SEMAPHORE_SERVICE, + LOCK_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch +from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock from hazelcast.internal.asyncio_proxy.semaphore import ( Semaphore, SessionAwareSemaphore, @@ -109,6 +113,25 @@ async def get_count_down_latch(self, name: str) -> CountDownLatch: """ return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + async def get_lock(self, name: str) -> FencedLock: + """Returns the distributed FencedLock instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + FencedLock instance will be created on the DEFAULT CP group. + If a group name is given, like ``.get_lock("myLock@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name: Name of the FencedLock + + Returns: + The FencedLock proxy for the given name. + """ + return await self._proxy_manager.get_or_create(LOCK_SERVICE, name) + async def get_semaphore(self, name: str) -> Semaphore: """Returns the distributed Semaphore instance with given name. @@ -132,6 +155,8 @@ async def get_semaphore(self, name: str) -> Semaphore: class CPProxyManager: def __init__(self, context): self._context = context + self._lock_proxies = dict() # proxy_name to FencedLock + self._mux = asyncio.Lock() # Guards the _lock_proxies async def get_or_create(self, service_name, proxy_name): proxy_name = _without_default_group_name(proxy_name) @@ -144,11 +169,26 @@ async def get_or_create(self, service_name, proxy_name): return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) elif service_name == COUNT_DOWN_LATCH_SERVICE: return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == LOCK_SERVICE: + return await self._create_fenced_lock(group_id, proxy_name, object_name) elif service_name == SEMAPHORE_SERVICE: return await self._create_semaphore(group_id, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) + async def _create_fenced_lock(self, group_id, proxy_name, object_name): + async with self._mux: + proxy = self._lock_proxies.get(proxy_name, None) + if proxy: + if proxy.get_group_id() != group_id: + self._lock_proxies.pop(proxy_name, None) + else: + return proxy + + proxy = FencedLock(self._context, group_id, LOCK_SERVICE, proxy_name, object_name) + self._lock_proxies[proxy_name] = proxy + return proxy + async def _create_semaphore(self, group_id, proxy_name, object_name): codec = semaphore_get_semaphore_type_codec request = codec.encode_request(proxy_name) diff --git a/hazelcast/internal/asyncio_proxy/fenced_lock.py b/hazelcast/internal/asyncio_proxy/fenced_lock.py new file mode 100644 index 0000000000..4d8f94b9c1 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/fenced_lock.py @@ -0,0 +1,270 @@ +import time +import uuid + +from hazelcast.errors import ( + LockOwnershipLostError, + IllegalMonitorStateError, + SessionExpiredError, + WaitKeyCancelledError, + LockAcquireLimitReachedError, +) +from hazelcast.internal.asyncio_proxy.cp import SessionAwareCPProxy +from hazelcast.protocol.codec import ( + fenced_lock_unlock_codec, + fenced_lock_get_lock_ownership_codec, + fenced_lock_try_lock_codec, + fenced_lock_lock_codec, +) +from hazelcast.proxy.cp.fenced_lock import _LockOwnershipState +from hazelcast.util import to_millis + +_NO_SESSION_ID = -1 + + +class FencedLock(SessionAwareCPProxy): + """A linearizable, distributed lock. + + FencedLock is CP with respect to the CAP principle. It works on top + of the Raft consensus algorithm. It offers linearizability during crash-stop + failures and network partitions. If a network partition occurs, it remains + available on at most one side of the partition. + + FencedLock works on top of CP sessions. Please refer to CP Session + documentation section for more information. + + By default, FencedLock is reentrant. Once a caller acquires + the lock, it can acquire the lock reentrantly as many times as it wants + in a linearizable manner. You can configure the reentrancy behaviour + on the member side. For instance, reentrancy can be disabled and + FencedLock can work as a non-reentrant mutex. One can also set + a custom reentrancy limit. When the reentrancy limit is reached, + FencedLock does not block a lock call. Instead, it fails with + ``LockAcquireLimitReachedError`` or a specified return value. + Please check the locking methods to see details about the behaviour. + """ + + INVALID_FENCE = 0 + + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(FencedLock, self).__init__(context, group_id, service_name, proxy_name, object_name) + self._lock_session_ids = dict() # thread-id to session id that has acquired the lock + + async def lock(self) -> int: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + return await self._do_lock(current_thread_id, invocation_uuid) + + async def try_lock(self, timeout: float = 0) -> int: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + timeout = max(0.0, timeout) + return await self._do_try_lock(current_thread_id, invocation_uuid, timeout) + + async def unlock(self) -> None: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + + # the order of the following checks is important + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + if session_id == _NO_SESSION_ID: + self._lock_session_ids.pop(current_thread_id, None) + raise self._new_illegal_monitor_state_error() + + try: + still_locked_by_the_current_thread = await self._request_unlock( + session_id, current_thread_id, uuid.uuid4() + ) + if still_locked_by_the_current_thread: + self._lock_session_ids[current_thread_id] = session_id + else: + self._lock_session_ids.pop(current_thread_id, None) + + self._release_session(session_id) + except SessionExpiredError: + self._invalidate_session(session_id) + self._lock_session_ids.pop(current_thread_id, None) + raise self._new_lock_ownership_lost_error(session_id) + except IllegalMonitorStateError as e: + self._lock_session_ids.pop(current_thread_id, None) + raise e + + async def is_locked(self) -> bool: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + f = await self._request_get_lock_ownership_state() + state = _LockOwnershipState(f) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + return True + + self._verify_no_locked_session_id_present(current_thread_id) + return state.is_locked() + + async def is_locked_by_current_thread(self) -> bool: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + f = await self._request_get_lock_ownership_state() + state = _LockOwnershipState(f) + locked_by_the_current_thread = state.is_locked_by(session_id, current_thread_id) + if locked_by_the_current_thread: + self._lock_session_ids[current_thread_id] = session_id + else: + self._verify_no_locked_session_id_present(current_thread_id) + + return locked_by_the_current_thread + + async def get_lock_count(self) -> int: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + f = await self._request_get_lock_ownership_state() + state = _LockOwnershipState(f) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + else: + self._verify_no_locked_session_id_present(current_thread_id) + + return state.lock_count + + async def destroy(self) -> None: + self._lock_session_ids.clear() + return await super(FencedLock, self).destroy() + + async def _do_lock(self, current_thread_id, invocation_uuid): + async def do_lock_once(session_id): + self._verify_locked_session_id_if_present(current_thread_id, session_id, True) + + try: + fence = await self._request_lock(session_id, current_thread_id, invocation_uuid) + except SessionExpiredError: + self._invalidate_session(session_id) + self._verify_no_locked_session_id_present(current_thread_id) + return await self._do_lock(current_thread_id, invocation_uuid) + except WaitKeyCancelledError: + self._release_session(session_id) + raise IllegalMonitorStateError( + "Lock(%s) not acquired because the lock call on the CP group " + "is cancelled, possibly because of another indeterminate call " + "from the same thread." % self._object_name + ) + except Exception as e: + self._release_session(session_id) + raise e + + if fence != self.INVALID_FENCE: + self._lock_session_ids[current_thread_id] = session_id + return fence + + self._release_session(session_id) + raise LockAcquireLimitReachedError( + "Lock(%s) reentrant lock limit is already reached!" % self._object_name + ) + + session_id = await self._acquire_session() + return await do_lock_once(session_id) + + async def _do_try_lock(self, current_thread_id, invocation_uuid, timeout): + start = time.time() + + async def do_try_lock_once(session_id): + self._verify_locked_session_id_if_present(current_thread_id, session_id, True) + + try: + fence = await self._request_try_lock( + session_id, current_thread_id, invocation_uuid, timeout + ) + except SessionExpiredError: + self._invalidate_session(session_id) + self._verify_no_locked_session_id_present(current_thread_id) + remaining_timeout = timeout - (time.time() - start) + if remaining_timeout <= 0: + return self.INVALID_FENCE + return await self._do_try_lock( + current_thread_id, invocation_uuid, remaining_timeout + ) + except WaitKeyCancelledError: + self._release_session(session_id) + return self.INVALID_FENCE + except Exception as e: + self._release_session(session_id) + raise e + + if fence != self.INVALID_FENCE: + self._lock_session_ids[current_thread_id] = session_id + else: + self._release_session(session_id) + + return fence + + session_id = await self._acquire_session() + return await do_try_lock_once(session_id) + + def _verify_locked_session_id_if_present(self, current_thread_id, session_id, release_session): + lock_session_id = self._lock_session_ids.get(current_thread_id, None) + if lock_session_id and lock_session_id != session_id: + self._lock_session_ids.pop(current_thread_id, None) + if release_session: + self._release_session(session_id) + + raise self._new_lock_ownership_lost_error(lock_session_id) + + def _verify_no_locked_session_id_present(self, current_thread_id): + lock_session_id = self._lock_session_ids.pop(current_thread_id, None) + if lock_session_id: + raise self._new_lock_ownership_lost_error(lock_session_id) + + def _new_lock_ownership_lost_error(self, lock_session_id): + return LockOwnershipLostError( + "Current thread is not the owner of the Lock(%s) because its " + "Session(%s) is closed by the server." % (self._proxy_name, lock_session_id) + ) + + def _new_illegal_monitor_state_error(self): + return IllegalMonitorStateError( + "Current thread is not the owner of the Lock(%s)" % self._proxy_name + ) + + async def _request_lock(self, session_id, current_thread_id, invocation_uuid): + codec = fenced_lock_lock_codec + request = codec.encode_request( + self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_try_lock(self, session_id, current_thread_id, invocation_uuid, timeout): + codec = fenced_lock_try_lock_codec + request = codec.encode_request( + self._group_id, + self._object_name, + session_id, + current_thread_id, + invocation_uuid, + to_millis(timeout), + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_unlock(self, session_id, current_thread_id, invocation_uuid): + codec = fenced_lock_unlock_codec + request = codec.encode_request( + self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_get_lock_ownership_state(self): + codec = fenced_lock_get_lock_ownership_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) diff --git a/tests/integration/asyncio/proxy/fenced_lock_test.py b/tests/integration/asyncio/proxy/fenced_lock_test.py new file mode 100644 index 0000000000..0fc86cac94 --- /dev/null +++ b/tests/integration/asyncio/proxy/fenced_lock_test.py @@ -0,0 +1,214 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, IllegalMonitorStateError +from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock +from tests.integration.asyncio.base import CPTestCase +from tests.util import random_string + + +@pytest.mark.enterprise +class FencedLockTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.lock = await self.client.cp_subsystem.get_lock(random_string()) + self.initial_acquire_count = self.get_initial_acquire_count() + + async def tearDown(self): + await self.lock.destroy() + await super().asyncTearDown() + + async def test_lock_in_another_group(self): + another_lock = await self.client.cp_subsystem.get_lock(self.lock._proxy_name + "@another") + self.assert_valid_fence(await another_lock.lock()) + try: + self.assertTrue(await another_lock.is_locked()) + self.assertFalse(await self.lock.is_locked()) + finally: + await another_lock.unlock() + + async def test_lock_after_client_shutdown(self): + another_client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + another_lock = await another_client.cp_subsystem.get_lock(self.lock._proxy_name) + self.assert_valid_fence(await another_lock.lock()) + self.assertTrue(await another_lock.is_locked()) + self.assertTrue(await self.lock.is_locked()) + await another_client.shutdown() + + async def assertion(): + self.assertFalse(await self.lock.is_locked()) + + await self.assertTrueEventually(assertion) + + async def test_use_after_destroy(self): + await self.lock.destroy() + # the next destroy call should be ignored + await self.lock.destroy() + + try: + await self.lock.lock() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + lock2 = await self.client.cp_subsystem.get_lock(self.lock._proxy_name) + + try: + await lock2.lock() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_lock_when_not_locked(self): + self.assert_valid_fence(await self.lock.lock()) + await self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + async def test_lock_when_locked_by_self(self): + fence = await self.lock.lock() + self.assert_valid_fence(fence) + fence2 = await self.lock.lock() + self.assertEqual(fence, fence2) + await self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + await self.lock.unlock() + await self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = await self.lock.lock() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + # TODO: test_lock_when_locked_by_another_thread(self): + + async def test_try_lock_when_free(self): + self.assert_valid_fence(await self.lock.try_lock()) + await self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + async def test_try_lock_when_free_with_timeout(self): + self.assert_valid_fence(await self.lock.try_lock(1)) + await self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + async def test_try_lock_when_locked_by_self(self): + fence = await self.lock.lock() + self.assert_valid_fence(fence) + fence2 = await self.lock.try_lock() + self.assertEqual(fence, fence2) + await self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + await self.lock.unlock() + await self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = await self.lock.try_lock() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + async def test_try_lock_when_locked_by_self_with_timeout(self): + fence = await self.lock.lock() + self.assert_valid_fence(fence) + fence2 = await self.lock.try_lock(2) + self.assertEqual(fence, fence2) + await self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + await self.lock.unlock() + await self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = await self.lock.try_lock(2) + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + # TODO: test_try_lock_when_locked_by_another_thread(self): + # TODO: test_try_lock_when_locked_by_another_thread_with_timeout(self): + + async def test_unlock_when_free(self): + try: + await self.lock.unlock() + except IllegalMonitorStateError: + pass + else: + self.fail("expected IllegalMonitorStateError to be raised") + + await self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + async def test_unlock_when_locked_by_self(self): + await self.lock.lock() + self.assertIsNone(await self.lock.unlock()) + + try: + await self.lock.unlock() + except IllegalMonitorStateError: + pass + else: + self.fail("expected IllegalMonitorStateError to be raised") + + await self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + async def test_unlock_multiple_times(self): + await self.lock.lock() + await self.lock.lock() + await self.lock.unlock() + await self.lock.unlock() + await self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + # TODO: test_unlock_when_locked_by_another_thread(self): + + async def test_is_locked_when_free(self): + self.assertFalse(await self.lock.is_locked()) + self.assert_session_acquire_count(0) + + async def test_is_locked_when_locked_by_self(self): + await self.lock.lock() + self.assertTrue(await self.lock.is_locked()) + await self.lock.unlock() + self.assertFalse(await self.lock.is_locked()) + self.assert_session_acquire_count(0) + + # TODO: test_is_locked_when_locked_by_another_thread(self): + + async def test_is_locked_by_current_thread_when_free(self): + self.assertFalse(await self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(0) + + async def test_is_locked_by_current_thread_when_locked_by_self(self): + await self.lock.lock() + self.assertTrue(await self.lock.is_locked_by_current_thread()) + await self.lock.unlock() + self.assertFalse(await self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(0) + + # TODO: def test_is_locked_by_current_thread_when_locked_by_another_thread(self): + + async def assert_lock(self, is_locked, is_locked_by_current_thread, lock_count): + self.assertEqual(is_locked, await self.lock.is_locked()) + self.assertEqual(is_locked_by_current_thread, await self.lock.is_locked_by_current_thread()) + self.assertEqual(lock_count, await self.lock.get_lock_count()) + + def assert_valid_fence(self, fence): + self.assertNotEqual(FencedLock.INVALID_FENCE, fence) + + def assert_not_valid_fence(self, fence): + self.assertEqual(FencedLock.INVALID_FENCE, fence) + + def assert_session_acquire_count(self, expected_acquire_count): + session = self.client._proxy_session_manager._sessions.get(self.lock.get_group_id(), None) + if not session: + actual = 0 + else: + actual = session.acquire_count.get() + + self.assertEqual(self.initial_acquire_count + expected_acquire_count, actual) + + def get_initial_acquire_count(self): + session = self.client._proxy_session_manager._sessions.get(self.lock.get_group_id(), None) + if not session: + return 0 + return session.acquire_count.get() From a293fd9343ceb7512d293d74dedbfe38aa0bbf3f Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 8 Jun 2026 10:29:05 +0300 Subject: [PATCH 12/12] Trivial --- hazelcast/internal/asyncio_proxy/fenced_lock.py | 1 - 1 file changed, 1 deletion(-) diff --git a/hazelcast/internal/asyncio_proxy/fenced_lock.py b/hazelcast/internal/asyncio_proxy/fenced_lock.py index 6b8a98721b..4d8f94b9c1 100644 --- a/hazelcast/internal/asyncio_proxy/fenced_lock.py +++ b/hazelcast/internal/asyncio_proxy/fenced_lock.py @@ -268,4 +268,3 @@ async def _request_get_lock_ownership_state(self): codec = fenced_lock_get_lock_ownership_codec request = codec.encode_request(self._group_id, self._object_name) return await self._ainvoke(request, codec.decode_response) -