From cf1e66049caef5a65be1a5e2a21f6ebad065c651 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 2 Jun 2026 11:41:56 +0300 Subject: [PATCH 1/8] AtomicLong for asyncio --- hazelcast/asyncio/__init__.py | 4 + hazelcast/internal/asyncio_client.py | 7 + .../internal/asyncio_proxy/atomic_long.py | 255 ++++++++++++++++++ hazelcast/internal/asyncio_proxy/cp.py | 34 +++ .../internal/asyncio_proxy/cp_manager.py | 79 ++++++ tests/integration/asyncio/base.py | 36 +++ .../asyncio/proxy/atomic_long_test.py | 148 ++++++++++ 7 files changed, 563 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/atomic_long.py create mode 100644 hazelcast/internal/asyncio_proxy/cp.py create mode 100644 hazelcast/internal/asyncio_proxy/cp_manager.py create mode 100644 tests/integration/asyncio/proxy/atomic_long_test.py diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index e06313e9c2..97f42b627f 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -4,6 +4,8 @@ del warnings __all__ = [ + "AtomicLong", + "CPSubsystem", "EntryEventCallable", "Executor", "HazelcastClient", @@ -32,3 +34,5 @@ from hazelcast.internal.asyncio_proxy.set import Set from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener +from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem +from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 5d7bd327f7..bf1adaf417 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -11,6 +11,7 @@ from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation +from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService @@ -203,6 +204,7 @@ def __init__(self, config: Config | None = None, **kwargs): self._compact_schema_service, ) self._proxy_manager = ProxyManager(self._context) + self._cp_subsystem = CPSubsystem(self._context) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics( self, @@ -519,6 +521,11 @@ def cluster_service(self) -> ClusterService: """ return self._cluster_service + @property + def cp_subsystem(self) -> CPSubsystem: + """CP Subsystem offers set of in-memory linearizable data structures.""" + return self._cp_subsystem + def _create_address_provider(self): config = self._config cluster_members = config.cluster_members diff --git a/hazelcast/internal/asyncio_proxy/atomic_long.py b/hazelcast/internal/asyncio_proxy/atomic_long.py new file mode 100644 index 0000000000..664b097ef7 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/atomic_long.py @@ -0,0 +1,255 @@ +import typing + +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import ( + atomic_long_add_and_get_codec, + atomic_long_compare_and_set_codec, + atomic_long_get_codec, + atomic_long_get_and_add_codec, + atomic_long_get_and_set_codec, + atomic_long_alter_codec, + atomic_long_apply_codec, +) +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_is_int, check_not_none + + +class AtomicLong(BaseCPProxy): + """AtomicLong is a redundant and highly available distributed counter + for 64-bit integers (``long`` type in Java). + + It works on top of the Raft consensus algorithm. It offers linearizability + during crash failures and network partitions. It is CP with respect to + the CAP principle. If a network partition occurs, it remains available + on at most one side of the partition. + + AtomicLong implementation does not offer exactly-once / effectively-once + execution semantics. It goes with at-least-once execution semantics + by default and can cause an API call to be committed multiple times + in case of CP member failures. It can be tuned to offer at-most-once + execution semantics. Please see `fail-on-indeterminate-operation-state` + server-side setting. + """ + + async def add_and_get(self, delta: int) -> int: + """Atomically adds the given value to the current value. + + Args: + delta: The value to add to the current value. + + Returns: + The updated value, the given value added to the current value. + """ + check_is_int(delta) + codec = atomic_long_add_and_get_codec + request = codec.encode_request(self._group_id, self._object_name, delta) + return await self._ainvoke(request, codec.decode_response) + + async def compare_and_set(self, expect: int, update: int) -> bool: + """Atomically sets the value to the given updated value + only if the current value equals the expected value. + + Args: + expect: The expected value. + update: The new value. + + Returns: + ``True`` if successful; or ``False`` if the actual value was not + equal to the expected value. + """ + check_is_int(expect) + check_is_int(update) + codec = atomic_long_compare_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, expect, update) + return await self._ainvoke(request, codec.decode_response) + + async def decrement_and_get(self) -> int: + """Atomically decrements the current value by one. + + Returns: + The updated value, the current value decremented by one. + """ + return await self.add_and_get(-1) + + async def get_and_decrement(self) -> int: + """Atomically decrements the current value by one. + + Returns: + The old value. + """ + return await self.get_and_add(-1) + + async def get(self) -> int: + """Gets the current value. + + Returns: + The current value. + """ + codec = atomic_long_get_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_add(self, delta: int) -> int: + """Atomically adds the given value to the current value. + + Args: + delta: The value to add to the current value. + + Returns: + The old value before the add. + """ + check_is_int(delta) + codec = atomic_long_get_and_add_codec + request = codec.encode_request(self._group_id, self._object_name, delta) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_set(self, new_value: int) -> int: + """Atomically sets the given value and returns the old value. + + Args: + new_value: The new value. + + Returns: + The old value. + """ + check_is_int(new_value) + codec = atomic_long_get_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value) + return await self._ainvoke(request, codec.decode_response) + + async def increment_and_get(self) -> int: + """Atomically increments the current value by one. + + Returns: + The updated value, the current value incremented by one. + """ + return await self.add_and_get(1) + + async def get_and_increment(self) -> int: + """Atomically increments the current value by one. + + Returns: + The old value. + """ + return await self.get_and_add(1) + + async def set(self, new_value: int) -> None: + """Atomically sets the given value. + + Args: + new_value: The new value + """ + check_is_int(new_value) + codec = atomic_long_get_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value) + return await self._ainvoke(request) + + async def alter(self, function: typing.Any) -> None: + """Alters the currently stored value by applying a function on it. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter, function) + + codec = atomic_long_alter_codec + # 1 means return the new value. + # There is no way to tell server to return nothing as of now (30.09.2020) + # The new value is `long` (comes with the initial frame) and we + # don't try to decode it. So, this shouldn't cause any problems. + request = codec.encode_request(self._group_id, self._object_name, function_data, 1) + return await self._ainvoke(request) + + async def alter_and_get(self, function: typing.Any) -> int: + """Alters the currently stored value by applying a function on it and + gets the result. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + + Returns: + The new value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter_and_get, function) + + codec = atomic_long_alter_codec + # 1 means return the new value. + request = codec.encode_request(self._group_id, self._object_name, function_data, 1) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_alter(self, function: typing.Any) -> int: + """Alters the currently stored value by applying a function on it and + gets the old value. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + + Returns: + The old value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_alter, function) + + codec = atomic_long_alter_codec + # 0 means return the old value. + request = codec.encode_request(self._group_id, self._object_name, function_data, 0) + return await self._ainvoke(request, codec.decode_response) + + async def apply(self, function: typing.Any) -> typing.Any: + """Applies a function on the value, the actual stored value will not + change. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function applied to the currently stored value. + + Returns: + The result of the function application. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return self._send_schema_and_retry(e, self.apply, function) + + codec = atomic_long_apply_codec + request = codec.encode_request(self._group_id, self._object_name, function_data) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) diff --git a/hazelcast/internal/asyncio_proxy/cp.py b/hazelcast/internal/asyncio_proxy/cp.py new file mode 100644 index 0000000000..333500a641 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/cp.py @@ -0,0 +1,34 @@ +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.protocol.codec import cp_group_destroy_cp_object_codec + + +def _no_op_response_handler(_): + return None + + +class BaseCPProxy: + def __init__(self, context, group_id, service_name, proxy_name, object_name): + self._group_id = group_id + self._service_name = service_name + self._proxy_name = proxy_name + self._object_name = object_name + self._invocation_service = context.invocation_service + serialization_service = context.serialization_service + self._to_data = serialization_service.to_data + self._to_object = serialization_service.to_object + self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry + + async def destroy(self) -> None: + """Destroys this proxy.""" + codec = cp_group_destroy_cp_object_codec + request = codec.encode_request(self._group_id, self._service_name, self._object_name) + return await self._ainvoke(request) + + def _invoke(self, request, response_handler=_no_op_response_handler): + invocation = Invocation(request, response_handler=response_handler) + self._invocation_service.invoke(invocation) + return invocation.future + + async def _ainvoke(self, request, response_handler=_no_op_response_handler): + fut = self._invoke(request, response_handler) + return await fut diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py new file mode 100644 index 0000000000..dea872be06 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -0,0 +1,79 @@ +from hazelcast.cp import ( + _without_default_group_name, + _get_object_name_for_proxy, + ATOMIC_LONG_SERVICE, +) +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.protocol.codec import cp_group_create_cp_group_codec + + +class CPSubsystem: + """CP Subsystem is a component of Hazelcast that builds a strongly + consistent layer for a set of distributed data structures. + + Its APIs can be used for implementing distributed coordination use cases, + such as leader election, distributed locking, synchronization, and metadata + management. + + Its data structures are CP with respect to the CAP principle, i.e., they + always maintain linearizability and prefer consistency to availability + during network partitions. Besides network partitions, CP Subsystem + withstands server and client failures. + + Data structures in CP Subsystem run in CP groups. Each CP group elects + its own Raft leader and runs the Raft consensus algorithm independently. + + The CP data structures differ from the other Hazelcast data structures + in two aspects. First, an internal commit is performed on the METADATA CP + group every time you fetch a proxy from this interface. Hence, callers + should cache returned proxy objects. Second, if you call ``destroy()`` + on a CP data structure proxy, that data structure is terminated on the + underlying CP group and cannot be reinitialized until the CP group is + force-destroyed. For this reason, please make sure that you are completely + done with a CP data structure before destroying its proxy. + """ + + def __init__(self, context): + self._proxy_manager = CPProxyManager(context) + + async def get_atomic_long(self, name: str) -> AtomicLong: + """Returns the distributed AtomicLong instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + AtomicLong instance will be created on the default CP group. + If a group name is given, like ``.get_atomic_long("myLong@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name: Name of the AtomicLong. + + Returns: + The AtomicLong proxy for the given name. + """ + return await self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name) + + +class CPProxyManager: + def __init__(self, context): + self._context = context + + async def get_or_create(self, service_name, proxy_name): + proxy_name = _without_default_group_name(proxy_name) + object_name = _get_object_name_for_proxy(proxy_name) + + group_id = await self._get_group_id(proxy_name) + if service_name == ATOMIC_LONG_SERVICE: + return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) + + raise ValueError("Unknown service name: %s" % service_name) + + async def _get_group_id(self, proxy_name): + codec = cp_group_create_cp_group_codec + request = codec.encode_request(proxy_name) + invocation = Invocation(request, response_handler=codec.decode_response) + invocation_service = self._context.invocation_service + return await invocation_service.ainvoke(invocation) diff --git a/tests/integration/asyncio/base.py b/tests/integration/asyncio/base.py index 55c92e1b50..8fbb2a87d3 100644 --- a/tests/integration/asyncio/base.py +++ b/tests/integration/asyncio/base.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import unittest from typing import Awaitable @@ -121,3 +122,38 @@ async def asyncSetUp(self): async def asyncTearDown(self): await self.client.shutdown() + + +class CPTestCase(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + + rc = None + cluster = None + client = None + + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, cls.configure_cluster()) + cls.cluster.start_member() + cls.cluster.start_member() + cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open( + os.path.join(dir_path, "../backward_compatible/proxy/cp/hazelcast_cpsubsystem.xml") + ) as f: + return f.read() + + async def asyncSetUp(self): + self.client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + + async def asyncTearDown(self): + await self.client.shutdown() diff --git a/tests/integration/asyncio/proxy/atomic_long_test.py b/tests/integration/asyncio/proxy/atomic_long_test.py new file mode 100644 index 0000000000..5acb0df721 --- /dev/null +++ b/tests/integration/asyncio/proxy/atomic_long_test.py @@ -0,0 +1,148 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError +from hazelcast.serialization.api import IdentifiedDataSerializable +from tests.integration.asyncio.base import CPTestCase +from tests.util import skip_if_server_version_older_than + + +class Multiplication(IdentifiedDataSerializable): + def __init__(self, multiplier): + self.multiplier = multiplier + + def write_data(self, object_data_output): + object_data_output.write_long(self.multiplier) + + def read_data(self, object_data_input): + pass + + def get_factory_id(self): + return 66 + + def get_class_id(self): + return 16 + + +@pytest.mark.enterprise +class AtomicLongTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.atomic_long = await self.client.cp_subsystem.get_atomic_long("long") + + async def asyncTearDown(self): + await self.atomic_long.set(0) + await super().asyncTearDown() + + async def test_atomic_long_in_another_group(self): + another_long = await self.client.cp_subsystem.get_atomic_long("long@mygroup") + self.assertEqual(1, await another_long.increment_and_get()) + # the following value has to be 0, + # as `along` belongs to the default CP group + self.assertEqual(0, await self.atomic_long.get()) + + async def test_use_after_destroy(self): + another_long = await self.client.cp_subsystem.get_atomic_long("another-long") + await another_long.destroy() + # the next destroy call should be ignored + await another_long.destroy() + + try: + await another_long.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + another_long2 = await self.client.cp_subsystem.get_atomic_long("another-long") + try: + await another_long2.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_initial_value(self): + self.assertEqual(0, await self.atomic_long.get()) + + async def test_add_and_get(self): + self.assertEqual(33, await self.atomic_long.add_and_get(33)) + self.assertEqual(33, await self.atomic_long.get()) + + async def test_compare_and_set_when_condition_is_met(self): + self.assertTrue(await self.atomic_long.compare_and_set(0, 23)) + self.assertEqual(23, await self.atomic_long.get()) + + async def test_compare_and_set_when_condition_is_not_met(self): + self.assertFalse(await self.atomic_long.compare_and_set(1, 23)) + self.assertEqual(0, await self.atomic_long.get()) + + async def test_decrement_and_get(self): + self.assertEqual(-1, await self.atomic_long.decrement_and_get()) + self.assertEqual(-2, await self.atomic_long.decrement_and_get()) + self.assertEqual(-2, await self.atomic_long.get()) + + async def test_get_and_decrement(self): + self.assertEqual(0, await self.atomic_long.get_and_decrement()) + self.assertEqual(-1, await self.atomic_long.get_and_decrement()) + self.assertEqual(-2, await self.atomic_long.get()) + + async def test_get(self): + self.assertEqual(0, await self.atomic_long.get()) + await self.atomic_long.set(11) + self.assertEqual(11, await self.atomic_long.get()) + long_max = 2**63 - 1 + await self.atomic_long.set(long_max) + self.assertEqual(long_max, await self.atomic_long.get()) + long_min = -(2**63) + await self.atomic_long.set(long_min) + self.assertEqual(long_min, await self.atomic_long.get()) + + async def test_get_and_add(self): + self.assertEqual(0, await self.atomic_long.get_and_add(-100)) + self.assertEqual(-100, await self.atomic_long.get()) + + async def test_get_and_set(self): + self.assertEqual(0, await self.atomic_long.get_and_set(123)) + self.assertEqual(123, await self.atomic_long.get()) + + async def test_increment_and_get(self): + self.assertEqual(1, await self.atomic_long.increment_and_get()) + self.assertEqual(2, await self.atomic_long.increment_and_get()) + self.assertEqual(2, await self.atomic_long.get()) + + async def test_get_and_increment(self): + self.assertEqual(0, await self.atomic_long.get_and_increment()) + self.assertEqual(1, await self.atomic_long.get_and_increment()) + self.assertEqual(2, await self.atomic_long.get()) + + async def test_set(self): + self.assertIsNone(await self.atomic_long.set(42)) + self.assertEqual(42, await self.atomic_long.get()) + + async def test_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(2) + self.assertIsNone(await self.atomic_long.alter(Multiplication(5))) + self.assertEqual(10, await self.atomic_long.get()) + + async def test_alter_and_get(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(-3) + self.assertEqual(-9, await self.atomic_long.alter_and_get(Multiplication(3))) + self.assertEqual(-9, await self.atomic_long.get()) + + async def test_get_and_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(123) + self.assertEqual(123, await self.atomic_long.get_and_alter(Multiplication(-1))) + self.assertEqual(-123, await self.atomic_long.get()) + + async def test_apply(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(42) + self.assertEqual(84, await self.atomic_long.apply(Multiplication(2))) + self.assertEqual(42, await self.atomic_long.get()) From c493901b6ccc12b48f50ce79c60aa2c7d29812e1 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Tue, 2 Jun 2026 13:39:36 +0300 Subject: [PATCH 2/8] AtomicReference for Asyncio --- .../asyncio_proxy/atomic_reference.py | 277 ++++++++++++++++++ .../internal/asyncio_proxy/cp_manager.py | 24 ++ .../asyncio/proxy/atomic_reference_test.py | 144 +++++++++ 3 files changed, 445 insertions(+) create mode 100644 hazelcast/internal/asyncio_proxy/atomic_reference.py create mode 100644 tests/integration/asyncio/proxy/atomic_reference_test.py diff --git a/hazelcast/internal/asyncio_proxy/atomic_reference.py b/hazelcast/internal/asyncio_proxy/atomic_reference.py new file mode 100644 index 0000000000..0d12013db8 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/atomic_reference.py @@ -0,0 +1,277 @@ +import typing + +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import ( + atomic_ref_compare_and_set_codec, + atomic_ref_get_codec, + atomic_ref_set_codec, + atomic_ref_contains_codec, + atomic_ref_apply_codec, +) +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.types import ElementType +from hazelcast.util import check_not_none + + +class AtomicReference(BaseCPProxy, typing.Generic[ElementType]): + """A distributed, highly available object reference with atomic operations. + + AtomicReference offers linearizability during crash failures and network + partitions. It is CP with respect to the CAP principle. If a network + partition occurs, it remains available on at most one side of the + partition. + + The following are some considerations you need to know when you use + AtomicReference: + + - AtomicReference works based on the byte-content and not on the + object-reference. If you use the ``compare_and_set()`` method, do not + change the original value because its serialized content will then be + different. + - All methods returning an object return a private copy. You can modify the + private copy, but the rest of the world is shielded from your changes. If + you want these changes to be visible to the rest of the world, you need + to write the change back to the AtomicReference; but be careful about + introducing a data-race. + - The in-memory format of an AtomicReference is ``binary``. The receiving + side does not need to have the class definition available unless it needs + to be deserialized on the other side., e.g., because a method like + `alter()` is executed. This deserialization is done for every call that + needs to have the object instead of the binary content, so be careful + with expensive object graphs that need to be deserialized. + - If you have an object with many fields or an object graph, and you only + need to calculate some information or need a subset of fields, you can + use the `apply()` method. With the `apply()` method, the whole object + does not need to be sent over the line; only the information that is + relevant is sent. + + IAtomicReference does not offer exactly-once / effectively-once + execution semantics. It goes with at-least-once execution semantics + by default and can cause an API call to be committed multiple times + in case of CP member failures. It can be tuned to offer at-most-once + execution semantics. Please see `fail-on-indeterminate-operation-state` + server-side setting. + """ + + async def compare_and_set( + self, expect: typing.Optional[ElementType], update: typing.Optional[ElementType] + ) -> bool: + """Atomically sets the value to the given updated value + only if the current value is equal to the expected value. + + Args: + expect: The expected value. + update: The new value. + + Returns: + ``True`` if successful, or ``False`` if the actual value was not + equal to the expected value. + """ + try: + expected_data = self._to_data(expect) + new_data = self._to_data(update) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.compare_and_set, expect, update) + + codec = atomic_ref_compare_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, expected_data, new_data) + return await self._ainvoke(request, codec.decode_response) + + async def get(self) -> typing.Optional[ElementType]: + """Gets the current value. + + Returns: + The current value. + """ + codec = atomic_ref_get_codec + request = codec.encode_request(self._group_id, self._object_name) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def set(self, new_value: typing.Optional[ElementType]) -> None: + """Atomically sets the given value. + + Args: + new_value: The new value. + """ + try: + new_value_data = self._to_data(new_value) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.set, new_value) + + codec = atomic_ref_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value_data, False) + return await self._ainvoke(request) + + async def get_and_set( + self, new_value: typing.Optional[ElementType] + ) -> typing.Optional[ElementType]: + """Gets the old value and sets the new value. + + Args: + new_value: The new value. + + Returns: + The old value. + """ + try: + new_value_data = self._to_data(new_value) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_set, new_value) + + codec = atomic_ref_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value_data, True) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def is_none(self) -> bool: + """Checks if the stored reference is ``None``. + + Returns: + ``True`` if the stored reference is ``None``, ``False`` otherwise. + """ + return await self.contains(None) + + async def clear(self) -> None: + """Clears the current stored reference, so it becomes ``None``.""" + return await self.set(None) + + async def contains(self, value: typing.Optional[ElementType]) -> bool: + """Checks if the reference contains the value. + + Args: + value: The value to check (is allowed to be ``None``). + + Returns: + ``True`` if the value is found, ``False`` otherwise. + """ + try: + value_data = self._to_data(value) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.contains, value) + + codec = atomic_ref_contains_codec + request = codec.encode_request(self._group_id, self._object_name, value_data) + return await self._ainvoke(request, codec.decode_response) + + async def alter(self, function: typing.Any) -> None: + """Alters the currently stored reference by applying a function on it. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored reference. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter, function) + + codec = atomic_ref_apply_codec + # 0 means don't return the value + request = codec.encode_request(self._group_id, self._object_name, function_data, 0, True) + return await self._ainvoke(request) + + async def alter_and_get(self, function: typing.Any) -> typing.Optional[ElementType]: + """Alters the currently stored reference by applying a function on it + and gets the result. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored reference. + + Returns: + The new value, the result of the applied function. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter_and_get, function) + + codec = atomic_ref_apply_codec + # 2 means return the new value + request = codec.encode_request(self._group_id, self._object_name, function_data, 2, True) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def get_and_alter(self, function: typing.Any) -> typing.Optional[ElementType]: + """Alters the currently stored reference by applying a function on it + on and gets the old value. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored reference. + + Returns: + The old value, the value before the function is applied. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_alter, function) + + codec = atomic_ref_apply_codec + # 1 means return the old value + request = codec.encode_request(self._group_id, self._object_name, function_data, 1, True) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) + + async def apply(self, function: typing.Any) -> typing.Optional[ElementType]: + """Applies a function on the value, the actual stored value will not + change. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function applied on the currently stored reference. + + Returns: + The result of the function application. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.apply, function) + + codec = atomic_ref_apply_codec + # 2 means return the new value + request = codec.encode_request(self._group_id, self._object_name, function_data, 2, False) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index dea872be06..4141ec45c7 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -2,9 +2,11 @@ _without_default_group_name, _get_object_name_for_proxy, ATOMIC_LONG_SERVICE, + ATOMIC_REFERENCE_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.protocol.codec import cp_group_create_cp_group_codec @@ -56,6 +58,26 @@ async def get_atomic_long(self, name: str) -> AtomicLong: """ return await self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name) + async def get_atomic_reference(self, name: str) -> AtomicReference: + """Returns the distributed AtomicReference instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + AtomicLong instance will be created on the DEFAULT CP group. + If a group name is given, like + ``.get_atomic_reference("myRef@group1")``, the given group will be + initialized first, if not initialized already, and then the instance + will be created on this group. + + Args: + name: Name of the AtomicReference. + + Returns: + The AtomicReference proxy for the given name. + """ + return await self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name) + class CPProxyManager: def __init__(self, context): @@ -68,6 +90,8 @@ async def get_or_create(self, service_name, proxy_name): group_id = await self._get_group_id(proxy_name) if service_name == ATOMIC_LONG_SERVICE: return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == ATOMIC_REFERENCE_SERVICE: + return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) diff --git a/tests/integration/asyncio/proxy/atomic_reference_test.py b/tests/integration/asyncio/proxy/atomic_reference_test.py new file mode 100644 index 0000000000..404bab4980 --- /dev/null +++ b/tests/integration/asyncio/proxy/atomic_reference_test.py @@ -0,0 +1,144 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, ClassCastError +from tests.integration.asyncio.base import CPTestCase +from tests.integration.backward_compatible.proxy.cp.atomic_reference_test import AppendString +from tests.util import skip_if_server_version_older_than + + +@pytest.mark.enterprise +class AtomicReferenceTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.ref = await self.client.cp_subsystem.get_atomic_reference("ref") + + async def asyncTearDown(self): + await self.ref.clear() + await super().asyncTearDown() + + async def test_ref_in_another_group(self): + another_ref = await self.client.cp_subsystem.get_atomic_reference("ref@mygroup") + await another_ref.set("hey") + self.assertEqual("hey", await another_ref.get()) + # the following value has to be None, + # as `ref` belongs to the default CP group + self.assertIsNone(await self.ref.get()) + + async def test_use_after_destroy(self): + another_ref = await self.client.cp_subsystem.get_atomic_reference("another-ref") + await another_ref.destroy() + # the next destroy call should be ignored + await another_ref.destroy() + + try: + await another_ref.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + another_ref2 = await self.client.cp_subsystem.get_atomic_reference("another-ref") + + try: + await another_ref2.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_initial_value(self): + self.assertIsNone(await self.ref.get()) + + async def test_compare_and_set_when_condition_is_met(self): + self.assertTrue(await self.ref.compare_and_set(None, 42)) + self.assertEqual(42, await self.ref.get()) + self.assertTrue(await self.ref.compare_and_set(42, "hey")) + self.assertEqual("hey", await self.ref.get()) + + async def test_compare_and_set_when_condition_is_not_met(self): + self.assertFalse(await self.ref.compare_and_set(42, 23)) + self.assertIsNone(await self.ref.get()) + await self.ref.set("a") + self.assertFalse(await self.ref.compare_and_set("b", "c")) + self.assertEqual("a", await self.ref.get()) + + async def test_get(self): + await self.ref.set([1, 2, 3]) + self.assertEqual([1, 2, 3], await self.ref.get()) + + async def test_set(self): + self.assertIsNone(await self.ref.set("abc")) + self.assertEqual("abc", await self.ref.get()) + self.assertIsNone(await self.ref.set(["another_type", 1])) + self.assertEqual(["another_type", 1], await self.ref.get()) + + async def test_get_and_set(self): + self.assertIsNone(await self.ref.get_and_set("42")) + self.assertEqual("42", await self.ref.get()) + self.assertEqual("42", await self.ref.get_and_set(42)) + self.assertEqual(42, await self.ref.get()) + + async def test_is_none(self): + self.assertTrue(await self.ref.is_none()) + await self.ref.set(11) + self.assertFalse(await self.ref.is_none()) + await self.ref.set(None) + self.assertTrue(await self.ref.is_none()) + + async def test_clear(self): + self.assertIsNone(await self.ref.clear()) + self.assertIsNone(await self.ref.get()) + await self.ref.set("str") + self.assertEqual("str", await self.ref.get()) + self.assertIsNone(await self.ref.clear()) + self.assertIsNone(await self.ref.get()) + + async def test_contains(self): + self.assertTrue(await self.ref.contains(None)) + await self.ref.set("42") + self.assertTrue(await self.ref.contains("42")) + self.assertFalse(await self.ref.contains(42)) + self.assertFalse(await self.ref.contains(None)) + await self.ref.clear() + self.assertFalse(await self.ref.contains("42")) + self.assertTrue(await self.ref.contains(None)) + + async def test_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("hey") + self.assertIsNone(await self.ref.alter(AppendString("123"))) + self.assertEqual("hey123", await self.ref.get()) + + async def test_alter_with_incompatible_types(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set(42) + + try: + await self.ref.alter(AppendString(".")) + except ClassCastError: + pass + else: + self.fail("expected ClassCastError to be raised") + + async def test_alter_and_get(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("123") + self.assertEqual("123...", await self.ref.alter_and_get(AppendString("..."))) + self.assertEqual("123...", await self.ref.get()) + + async def test_get_and_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("hell") + self.assertEqual("hell", await self.ref.get_and_alter(AppendString("o"))) + self.assertEqual("hello", await self.ref.get()) + + async def test_apply(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.ref.set("hell") + self.assertEqual("hello", await self.ref.apply(AppendString("o"))) + self.assertEqual("hell", await self.ref.get()) From 4de45d196fd9ed39ecb80a3aa32a69175ce6f498 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 11:50:35 +0300 Subject: [PATCH 3/8] Added CountdownLatch for Asyncio --- .../internal/asyncio_proxy/countdown_latch.py | 138 +++++++++++++++ .../internal/asyncio_proxy/cp_manager.py | 25 ++- .../asyncio/proxy/countdown_latch_test.py | 159 ++++++++++++++++++ 3 files changed, 321 insertions(+), 1 deletion(-) create mode 100644 hazelcast/internal/asyncio_proxy/countdown_latch.py create mode 100644 tests/integration/asyncio/proxy/countdown_latch_test.py diff --git a/hazelcast/internal/asyncio_proxy/countdown_latch.py b/hazelcast/internal/asyncio_proxy/countdown_latch.py new file mode 100644 index 0000000000..3d8236eff0 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/countdown_latch.py @@ -0,0 +1,138 @@ +import uuid + +from hazelcast.errors import OperationTimeoutError +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \ + count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec +from hazelcast.util import check_is_number, to_millis, check_is_int, check_true + + +class CountDownLatch(BaseCPProxy): + """A distributed, concurrent countdown latch data structure. + + CountDownLatch is a cluster-wide synchronization aid + that allows one or more callers to wait until a set of operations being + performed in other callers completes. + + CountDownLatch count can be reset using ``try_set_count()`` method after + a countdown has finished but not during an active count. This allows + the same latch instance to be reused. + + There is no ``await_latch()`` method to wait indefinitely since this is + undesirable in a distributed application: for example, a cluster can split + or the master and replicas could all terminate. In most cases, it is best + to configure an explicit timeout, so you have the ability to deal with + these situations. + + All the API methods in the CountDownLatch offer the exactly-once + execution semantics. For instance, even if a ``count_down()`` call is + internally retried because of crashed Hazelcast member, the counter + value is decremented only once. + """ + + async def await_latch(self, timeout: float) -> bool: + """Causes the current thread to wait until the latch has counted down to + zero, or an exception is thrown, or the specified waiting time elapses. + + If the current count is zero then this method returns ``True``. + + If the current count is greater than zero, then the current + thread becomes disabled for thread scheduling purposes and lies + dormant until one of the following things happen: + + - The count reaches zero due to invocations of the ``count_down()`` + method + - This CountDownLatch instance is destroyed + - The countdown owner becomes disconnected + - The specified waiting time elapses + + If the count reaches zero, then the method returns with the + value ``True``. + + If the specified waiting time elapses then the value ``False`` + is returned. If the time is less than or equal to zero, the method + will not wait at all. + + Args: + timeout: The maximum time to wait in seconds + + Returns: + ``True`` if the count reached zero, ``False`` if the waiting time + elapsed before the count reached zero + Raises: + IllegalStateError: If the Hazelcast instance was shut down while + waiting. + """ + check_is_number(timeout) + timeout = max(0.0, timeout) + invocation_uuid = uuid.uuid4() + codec = count_down_latch_await_codec + request = codec.encode_request( + self._group_id, self._object_name, invocation_uuid, to_millis(timeout) + ) + return await self._ainvoke(request, codec.decode_response) + + async def count_down(self) -> None: + """Decrements the count of the latch, releasing all waiting threads if + the count reaches zero. + + If the current count is greater than zero, then it is decremented. + If the new count is zero: + + - All waiting threads are re-enabled for thread scheduling purposes + - Countdown owner is set to ``None``. + + If the current count equals zero, then nothing happens. + """ + invocation_uuid = uuid.uuid4() + res = await self._get_round() + return await self._do_count_down(res, invocation_uuid) + + async def get_count(self) -> int: + """Returns the current count. + + Returns: + The current count. + """ + codec = count_down_latch_get_count_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def try_set_count(self, count: int) -> bool: + """Sets the count to the given value if the current count is zero. + + If count is not zero, then this method does nothing and returns + ``False``. + + Args: + count: The number of times ``count_down()`` must be invoked before + callers can pass through ``await_latch()``. + + Returns: + ``True`` if the new count was set, ``False`` if the current count + is not zero. + """ + check_is_int(count) + check_true(count > 0, "Count must be positive") + codec = count_down_latch_try_set_count_codec + request = codec.encode_request(self._group_id, self._object_name, count) + return await self._ainvoke(request, codec.decode_response) + + async def _do_count_down(self, expected_round, invocation_uuid): + try: + return await self._request_count_down(expected_round, invocation_uuid) + except OperationTimeoutError: + # we can retry safely because the retry is idempotent + return await self._do_count_down(expected_round, invocation_uuid) + + async def _get_round(self): + codec = count_down_latch_get_round_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def _request_count_down(self, expected_round, invocation_uuid): + codec = count_down_latch_count_down_codec + request = codec.encode_request( + self._group_id, self._object_name, invocation_uuid, expected_round + ) + return await self._ainvoke(request) diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 4141ec45c7..39d36ae525 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -2,11 +2,12 @@ _without_default_group_name, _get_object_name_for_proxy, ATOMIC_LONG_SERVICE, - ATOMIC_REFERENCE_SERVICE, + ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference +from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch from hazelcast.protocol.codec import cp_group_create_cp_group_codec @@ -78,6 +79,26 @@ async def get_atomic_reference(self, name: str) -> AtomicReference: """ return await self._proxy_manager.get_or_create(ATOMIC_REFERENCE_SERVICE, name) + async def get_count_down_latch(self, name: str) -> CountDownLatch: + """Returns the distributed CountDownLatch instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + CountDownLatch instance will be created on the DEFAULT CP group. + If a group name is given, like + ``.get_count_down_latch("myLatch@group1")``, the given group will be + initialized first, if not initialized already, and then the instance + will be created on this group. + + Args: + name: Name of the CountDownLatch. + + Returns: + The CountDownLatch proxy for the given name. + """ + return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + class CPProxyManager: def __init__(self, context): @@ -92,6 +113,8 @@ async def get_or_create(self, service_name, proxy_name): return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) elif service_name == ATOMIC_REFERENCE_SERVICE: return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == COUNT_DOWN_LATCH_SERVICE: + return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) diff --git a/tests/integration/asyncio/proxy/countdown_latch_test.py b/tests/integration/asyncio/proxy/countdown_latch_test.py new file mode 100644 index 0000000000..8aac90428f --- /dev/null +++ b/tests/integration/asyncio/proxy/countdown_latch_test.py @@ -0,0 +1,159 @@ +import asyncio +import os + +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, OperationTimeoutError +from hazelcast.util import AtomicInteger +from tests.integration.asyncio.base import CPTestCase +from tests.util import random_string, get_current_timestamp + +inf = 2**31 - 1 + + +@pytest.mark.enterprise +class CountDownLatchTest(CPTestCase): + + async def test_latch_in_another_group(self): + latch = await self.get_latch() + another_latch = await self.client.cp_subsystem.get_count_down_latch( + latch._proxy_name + "@another" + ) + await another_latch.try_set_count(42) + self.assertEqual(42, await another_latch.get_count()) + self.assertNotEqual(42, await latch.get_count()) + + async def test_use_after_destroy(self): + latch = await self.get_latch() + await latch.destroy() + # the next destroy call should be ignored + await latch.destroy() + + try: + await latch.get_count() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + latch2 = await self.client.cp_subsystem.get_count_down_latch(latch._proxy_name) + + try: + await latch2.get_count() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_await_latch_negative_timeout(self): + latch = await self.get_latch(1) + self.assertFalse(await latch.await_latch(-1)) + + async def test_await_latch_zero_timeout(self): + latch = await self.get_latch(1) + self.assertFalse(await latch.await_latch(0)) + + async def test_await_latch_with_timeout(self): + timeout = 1 + latch = await self.get_latch(1) + start = get_current_timestamp() + self.assertFalse(await latch.await_latch(timeout)) + time_passed = get_current_timestamp() - start + expected_time_passed = timeout + if os.name == "nt": + # On Windows, we were getting random test failures due to expected + # time passed being slightly less than the timeout. This is due to + # the low time resolution there (15-16ms). If we are on Windows, we + # lower our expectations and settle for a slightly lower value. + expected_time_passed *= 0.95 + + self.assertTrue( + time_passed >= expected_time_passed, + "Time passed is less than %s, which is %s" % (expected_time_passed, time_passed), + ) + + async def test_await_latch_multiple_waiters(self): + latch = await self.get_latch(1) + # TODO: replace the following with the asyncio variant when implemented + completed = AtomicInteger() + + async def run(): + await latch.await_latch(inf) + completed.get_and_increment() + + count = 10 + tasks = [] + for _ in range(count): + tasks.append(asyncio.create_task(run())) + + await latch.count_down() + + def assertion(): + self.assertEqual(count, completed.get()) + + await self.assertTrueEventually(assertion) + + async def test_await_latch_response_on_count_down(self): + latch = await self.get_latch() + self.assertTrue(await latch.await_latch(inf)) + self.assertTrue(await latch.try_set_count(1)) + # make a non-blocking request + future = asyncio.create_task(latch.await_latch(inf)) + asyncio.create_task(latch.count_down()) + self.assertTrue(await future) + + async def test_count_down(self): + latch = await self.get_latch(10) + + for i in range(9, -1, -1): + self.assertIsNone(await latch.count_down()) + self.assertEqual(i, await latch.get_count()) + + async def test_count_down_retry_on_timeout(self): + latch = await self.get_latch(1) + original = latch._request_count_down + # TODO: replace the following with the asyncio variant when implemented + called_count = AtomicInteger() + + async def mock(expected_round, invocation_uuid): + if called_count.get_and_increment() < 2: + raise OperationTimeoutError("xx") + return await original(expected_round, invocation_uuid) + + latch._request_count_down = mock + await latch.count_down() + # Will resolve on it's third call. First 2 throws timeout error + self.assertEqual(3, called_count.get()) + self.assertEqual(0, await latch.get_count()) + + async def test_get_count(self): + latch = await self.get_latch(1) + self.assertEqual(1, await latch.get_count()) + await latch.count_down() + self.assertEqual(0, await latch.get_count()) + await latch.try_set_count(10) + self.assertEqual(10, await latch.get_count()) + + async def test_try_set_count(self): + latch = await self.get_latch() + self.assertTrue(await latch.try_set_count(3)) + self.assertEqual(3, await latch.get_count()) + + async def test_try_set_count_when_count_is_already_set(self): + latch = await self.get_latch(1) + self.assertFalse(await latch.try_set_count(10)) + self.assertFalse(await latch.try_set_count(20)) + self.assertEqual(1, await latch.get_count()) + + async def test_try_set_count_when_count_goes_to_zero(self): + latch = await self.get_latch(1) + await latch.count_down() + self.assertEqual(0, await latch.get_count()) + self.assertTrue(await latch.try_set_count(3)) + self.assertEqual(3, await latch.get_count()) + + async def get_latch(self, initial_count=None): + latch = await self.client.cp_subsystem.get_count_down_latch("latch-" + random_string()) + if initial_count is not None: + self.assertTrue(await latch.try_set_count(initial_count)) + return latch From aac1fc3cf8573217f531c19fa12ac04ed78fe147 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 11:51:17 +0300 Subject: [PATCH 4/8] black --- hazelcast/internal/asyncio_proxy/countdown_latch.py | 9 +++++++-- hazelcast/internal/asyncio_proxy/cp_manager.py | 3 ++- tests/integration/asyncio/proxy/countdown_latch_test.py | 1 - 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/hazelcast/internal/asyncio_proxy/countdown_latch.py b/hazelcast/internal/asyncio_proxy/countdown_latch.py index 3d8236eff0..28a7956adf 100644 --- a/hazelcast/internal/asyncio_proxy/countdown_latch.py +++ b/hazelcast/internal/asyncio_proxy/countdown_latch.py @@ -2,8 +2,13 @@ from hazelcast.errors import OperationTimeoutError from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy -from hazelcast.protocol.codec import count_down_latch_await_codec, count_down_latch_get_round_codec, \ - count_down_latch_count_down_codec, count_down_latch_get_count_codec, count_down_latch_try_set_count_codec +from hazelcast.protocol.codec import ( + count_down_latch_await_codec, + count_down_latch_get_round_codec, + count_down_latch_count_down_codec, + count_down_latch_get_count_codec, + count_down_latch_try_set_count_codec, +) from hazelcast.util import check_is_number, to_millis, check_is_int, check_true diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 39d36ae525..3749d40ea1 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -2,7 +2,8 @@ _without_default_group_name, _get_object_name_for_proxy, ATOMIC_LONG_SERVICE, - ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, + ATOMIC_REFERENCE_SERVICE, + COUNT_DOWN_LATCH_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong diff --git a/tests/integration/asyncio/proxy/countdown_latch_test.py b/tests/integration/asyncio/proxy/countdown_latch_test.py index 8aac90428f..74ec5f11bb 100644 --- a/tests/integration/asyncio/proxy/countdown_latch_test.py +++ b/tests/integration/asyncio/proxy/countdown_latch_test.py @@ -13,7 +13,6 @@ @pytest.mark.enterprise class CountDownLatchTest(CPTestCase): - async def test_latch_in_another_group(self): latch = await self.get_latch() another_latch = await self.client.cp_subsystem.get_count_down_latch( From d3b6199c7797c48d58736f40e2ec4088a0a7f8e6 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 11:54:52 +0300 Subject: [PATCH 5/8] CountdownLatch in public API --- hazelcast/asyncio/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index ba675e0c2e..e18bc367cb 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -1,6 +1,7 @@ __all__ = [ "AtomicLong", "CPSubsystem", + "CountDownLatch", "EntryEventCallable", "Executor", "HazelcastClient", @@ -31,3 +32,4 @@ from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch From 90190948032a6e9f9313b1e12528fe439bb96cfc Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 12:10:46 +0300 Subject: [PATCH 6/8] AtomicReference in public API --- hazelcast/asyncio/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index ba675e0c2e..89b0cc629c 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -1,5 +1,6 @@ __all__ = [ "AtomicLong", + "AtomicReference", "CPSubsystem", "EntryEventCallable", "Executor", @@ -31,3 +32,4 @@ from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference \ No newline at end of file From 6c6b6c8e9f7c65cadbb9890a593ebcc2b2c87695 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 12:14:02 +0300 Subject: [PATCH 7/8] black --- hazelcast/asyncio/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 89b0cc629c..cc94195cc0 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -32,4 +32,4 @@ from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong -from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference \ No newline at end of file +from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference From 18a4c4dc333c49f1dd660eac160c50b5a40ae9f3 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 3 Jun 2026 14:53:24 +0300 Subject: [PATCH 8/8] review comment --- hazelcast/internal/asyncio_proxy/atomic_long.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/internal/asyncio_proxy/atomic_long.py b/hazelcast/internal/asyncio_proxy/atomic_long.py index 664b097ef7..7bf8190ddc 100644 --- a/hazelcast/internal/asyncio_proxy/atomic_long.py +++ b/hazelcast/internal/asyncio_proxy/atomic_long.py @@ -244,7 +244,7 @@ async def apply(self, function: typing.Any) -> typing.Any: try: function_data = self._to_data(function) except SchemaNotReplicatedError as e: - return self._send_schema_and_retry(e, self.apply, function) + return await self._send_schema_and_retry(e, self.apply, function) codec = atomic_long_apply_codec request = codec.encode_request(self._group_id, self._object_name, function_data)