diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 0cdefdc6e5..21e60cb755 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -5,6 +5,7 @@ "CountDownLatch", "EntryEventCallable", "Executor", + "FencedLock", "HazelcastClient", "List", "Map", @@ -37,3 +38,4 @@ from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch from hazelcast.internal.asyncio_proxy.semaphore import Semaphore +from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py index 8dfa8c2eba..f306840936 100644 --- a/hazelcast/internal/asyncio_proxy/cp_manager.py +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -1,3 +1,5 @@ +import asyncio + from hazelcast.cp import ( _without_default_group_name, _get_object_name_for_proxy, @@ -5,11 +7,13 @@ ATOMIC_REFERENCE_SERVICE, COUNT_DOWN_LATCH_SERVICE, SEMAPHORE_SERVICE, + LOCK_SERVICE, ) from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch +from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock from hazelcast.internal.asyncio_proxy.semaphore import ( Semaphore, SessionAwareSemaphore, @@ -109,6 +113,25 @@ async def get_count_down_latch(self, name: str) -> CountDownLatch: """ return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name) + async def get_lock(self, name: str) -> FencedLock: + """Returns the distributed FencedLock instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + FencedLock instance will be created on the DEFAULT CP group. + If a group name is given, like ``.get_lock("myLock@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name: Name of the FencedLock + + Returns: + The FencedLock proxy for the given name. + """ + return await self._proxy_manager.get_or_create(LOCK_SERVICE, name) + async def get_semaphore(self, name: str) -> Semaphore: """Returns the distributed Semaphore instance with given name. @@ -132,6 +155,8 @@ async def get_semaphore(self, name: str) -> Semaphore: class CPProxyManager: def __init__(self, context): self._context = context + self._lock_proxies = dict() # proxy_name to FencedLock + self._mux = asyncio.Lock() # Guards the _lock_proxies async def get_or_create(self, service_name, proxy_name): proxy_name = _without_default_group_name(proxy_name) @@ -144,11 +169,26 @@ async def get_or_create(self, service_name, proxy_name): return AtomicReference(self._context, group_id, service_name, proxy_name, object_name) elif service_name == COUNT_DOWN_LATCH_SERVICE: return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name) + elif service_name == LOCK_SERVICE: + return await self._create_fenced_lock(group_id, proxy_name, object_name) elif service_name == SEMAPHORE_SERVICE: return await self._create_semaphore(group_id, proxy_name, object_name) raise ValueError("Unknown service name: %s" % service_name) + async def _create_fenced_lock(self, group_id, proxy_name, object_name): + async with self._mux: + proxy = self._lock_proxies.get(proxy_name, None) + if proxy: + if proxy.get_group_id() != group_id: + self._lock_proxies.pop(proxy_name, None) + else: + return proxy + + proxy = FencedLock(self._context, group_id, LOCK_SERVICE, proxy_name, object_name) + self._lock_proxies[proxy_name] = proxy + return proxy + async def _create_semaphore(self, group_id, proxy_name, object_name): codec = semaphore_get_semaphore_type_codec request = codec.encode_request(proxy_name) diff --git a/hazelcast/internal/asyncio_proxy/fenced_lock.py b/hazelcast/internal/asyncio_proxy/fenced_lock.py new file mode 100644 index 0000000000..4d8f94b9c1 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/fenced_lock.py @@ -0,0 +1,270 @@ +import time +import uuid + +from hazelcast.errors import ( + LockOwnershipLostError, + IllegalMonitorStateError, + SessionExpiredError, + WaitKeyCancelledError, + LockAcquireLimitReachedError, +) +from hazelcast.internal.asyncio_proxy.cp import SessionAwareCPProxy +from hazelcast.protocol.codec import ( + fenced_lock_unlock_codec, + fenced_lock_get_lock_ownership_codec, + fenced_lock_try_lock_codec, + fenced_lock_lock_codec, +) +from hazelcast.proxy.cp.fenced_lock import _LockOwnershipState +from hazelcast.util import to_millis + +_NO_SESSION_ID = -1 + + +class FencedLock(SessionAwareCPProxy): + """A linearizable, distributed lock. + + FencedLock is CP with respect to the CAP principle. It works on top + of the Raft consensus algorithm. It offers linearizability during crash-stop + failures and network partitions. If a network partition occurs, it remains + available on at most one side of the partition. + + FencedLock works on top of CP sessions. Please refer to CP Session + documentation section for more information. + + By default, FencedLock is reentrant. Once a caller acquires + the lock, it can acquire the lock reentrantly as many times as it wants + in a linearizable manner. You can configure the reentrancy behaviour + on the member side. For instance, reentrancy can be disabled and + FencedLock can work as a non-reentrant mutex. One can also set + a custom reentrancy limit. When the reentrancy limit is reached, + FencedLock does not block a lock call. Instead, it fails with + ``LockAcquireLimitReachedError`` or a specified return value. + Please check the locking methods to see details about the behaviour. + """ + + INVALID_FENCE = 0 + + def __init__(self, context, group_id, service_name, proxy_name, object_name): + super(FencedLock, self).__init__(context, group_id, service_name, proxy_name, object_name) + self._lock_session_ids = dict() # thread-id to session id that has acquired the lock + + async def lock(self) -> int: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + return await self._do_lock(current_thread_id, invocation_uuid) + + async def try_lock(self, timeout: float = 0) -> int: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + invocation_uuid = uuid.uuid4() + timeout = max(0.0, timeout) + return await self._do_try_lock(current_thread_id, invocation_uuid, timeout) + + async def unlock(self) -> None: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + + # the order of the following checks is important + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + if session_id == _NO_SESSION_ID: + self._lock_session_ids.pop(current_thread_id, None) + raise self._new_illegal_monitor_state_error() + + try: + still_locked_by_the_current_thread = await self._request_unlock( + session_id, current_thread_id, uuid.uuid4() + ) + if still_locked_by_the_current_thread: + self._lock_session_ids[current_thread_id] = session_id + else: + self._lock_session_ids.pop(current_thread_id, None) + + self._release_session(session_id) + except SessionExpiredError: + self._invalidate_session(session_id) + self._lock_session_ids.pop(current_thread_id, None) + raise self._new_lock_ownership_lost_error(session_id) + except IllegalMonitorStateError as e: + self._lock_session_ids.pop(current_thread_id, None) + raise e + + async def is_locked(self) -> bool: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + f = await self._request_get_lock_ownership_state() + state = _LockOwnershipState(f) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + return True + + self._verify_no_locked_session_id_present(current_thread_id) + return state.is_locked() + + async def is_locked_by_current_thread(self) -> bool: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + f = await self._request_get_lock_ownership_state() + state = _LockOwnershipState(f) + locked_by_the_current_thread = state.is_locked_by(session_id, current_thread_id) + if locked_by_the_current_thread: + self._lock_session_ids[current_thread_id] = session_id + else: + self._verify_no_locked_session_id_present(current_thread_id) + + return locked_by_the_current_thread + + async def get_lock_count(self) -> int: + # TODO: port the docs once implemented + # TODO: replace 0 with the lock context once implemented + current_thread_id = 0 + session_id = self._get_session_id() + self._verify_locked_session_id_if_present(current_thread_id, session_id, False) + f = await self._request_get_lock_ownership_state() + state = _LockOwnershipState(f) + if state.is_locked_by(session_id, current_thread_id): + self._lock_session_ids[current_thread_id] = session_id + else: + self._verify_no_locked_session_id_present(current_thread_id) + + return state.lock_count + + async def destroy(self) -> None: + self._lock_session_ids.clear() + return await super(FencedLock, self).destroy() + + async def _do_lock(self, current_thread_id, invocation_uuid): + async def do_lock_once(session_id): + self._verify_locked_session_id_if_present(current_thread_id, session_id, True) + + try: + fence = await self._request_lock(session_id, current_thread_id, invocation_uuid) + except SessionExpiredError: + self._invalidate_session(session_id) + self._verify_no_locked_session_id_present(current_thread_id) + return await self._do_lock(current_thread_id, invocation_uuid) + except WaitKeyCancelledError: + self._release_session(session_id) + raise IllegalMonitorStateError( + "Lock(%s) not acquired because the lock call on the CP group " + "is cancelled, possibly because of another indeterminate call " + "from the same thread." % self._object_name + ) + except Exception as e: + self._release_session(session_id) + raise e + + if fence != self.INVALID_FENCE: + self._lock_session_ids[current_thread_id] = session_id + return fence + + self._release_session(session_id) + raise LockAcquireLimitReachedError( + "Lock(%s) reentrant lock limit is already reached!" % self._object_name + ) + + session_id = await self._acquire_session() + return await do_lock_once(session_id) + + async def _do_try_lock(self, current_thread_id, invocation_uuid, timeout): + start = time.time() + + async def do_try_lock_once(session_id): + self._verify_locked_session_id_if_present(current_thread_id, session_id, True) + + try: + fence = await self._request_try_lock( + session_id, current_thread_id, invocation_uuid, timeout + ) + except SessionExpiredError: + self._invalidate_session(session_id) + self._verify_no_locked_session_id_present(current_thread_id) + remaining_timeout = timeout - (time.time() - start) + if remaining_timeout <= 0: + return self.INVALID_FENCE + return await self._do_try_lock( + current_thread_id, invocation_uuid, remaining_timeout + ) + except WaitKeyCancelledError: + self._release_session(session_id) + return self.INVALID_FENCE + except Exception as e: + self._release_session(session_id) + raise e + + if fence != self.INVALID_FENCE: + self._lock_session_ids[current_thread_id] = session_id + else: + self._release_session(session_id) + + return fence + + session_id = await self._acquire_session() + return await do_try_lock_once(session_id) + + def _verify_locked_session_id_if_present(self, current_thread_id, session_id, release_session): + lock_session_id = self._lock_session_ids.get(current_thread_id, None) + if lock_session_id and lock_session_id != session_id: + self._lock_session_ids.pop(current_thread_id, None) + if release_session: + self._release_session(session_id) + + raise self._new_lock_ownership_lost_error(lock_session_id) + + def _verify_no_locked_session_id_present(self, current_thread_id): + lock_session_id = self._lock_session_ids.pop(current_thread_id, None) + if lock_session_id: + raise self._new_lock_ownership_lost_error(lock_session_id) + + def _new_lock_ownership_lost_error(self, lock_session_id): + return LockOwnershipLostError( + "Current thread is not the owner of the Lock(%s) because its " + "Session(%s) is closed by the server." % (self._proxy_name, lock_session_id) + ) + + def _new_illegal_monitor_state_error(self): + return IllegalMonitorStateError( + "Current thread is not the owner of the Lock(%s)" % self._proxy_name + ) + + async def _request_lock(self, session_id, current_thread_id, invocation_uuid): + codec = fenced_lock_lock_codec + request = codec.encode_request( + self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_try_lock(self, session_id, current_thread_id, invocation_uuid, timeout): + codec = fenced_lock_try_lock_codec + request = codec.encode_request( + self._group_id, + self._object_name, + session_id, + current_thread_id, + invocation_uuid, + to_millis(timeout), + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_unlock(self, session_id, current_thread_id, invocation_uuid): + codec = fenced_lock_unlock_codec + request = codec.encode_request( + self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid + ) + return await self._ainvoke(request, codec.decode_response) + + async def _request_get_lock_ownership_state(self): + codec = fenced_lock_get_lock_ownership_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) diff --git a/tests/integration/asyncio/proxy/fenced_lock_test.py b/tests/integration/asyncio/proxy/fenced_lock_test.py new file mode 100644 index 0000000000..0fc86cac94 --- /dev/null +++ b/tests/integration/asyncio/proxy/fenced_lock_test.py @@ -0,0 +1,214 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError, IllegalMonitorStateError +from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock +from tests.integration.asyncio.base import CPTestCase +from tests.util import random_string + + +@pytest.mark.enterprise +class FencedLockTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.lock = await self.client.cp_subsystem.get_lock(random_string()) + self.initial_acquire_count = self.get_initial_acquire_count() + + async def tearDown(self): + await self.lock.destroy() + await super().asyncTearDown() + + async def test_lock_in_another_group(self): + another_lock = await self.client.cp_subsystem.get_lock(self.lock._proxy_name + "@another") + self.assert_valid_fence(await another_lock.lock()) + try: + self.assertTrue(await another_lock.is_locked()) + self.assertFalse(await self.lock.is_locked()) + finally: + await another_lock.unlock() + + async def test_lock_after_client_shutdown(self): + another_client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + another_lock = await another_client.cp_subsystem.get_lock(self.lock._proxy_name) + self.assert_valid_fence(await another_lock.lock()) + self.assertTrue(await another_lock.is_locked()) + self.assertTrue(await self.lock.is_locked()) + await another_client.shutdown() + + async def assertion(): + self.assertFalse(await self.lock.is_locked()) + + await self.assertTrueEventually(assertion) + + async def test_use_after_destroy(self): + await self.lock.destroy() + # the next destroy call should be ignored + await self.lock.destroy() + + try: + await self.lock.lock() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + lock2 = await self.client.cp_subsystem.get_lock(self.lock._proxy_name) + + try: + await lock2.lock() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_lock_when_not_locked(self): + self.assert_valid_fence(await self.lock.lock()) + await self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + async def test_lock_when_locked_by_self(self): + fence = await self.lock.lock() + self.assert_valid_fence(fence) + fence2 = await self.lock.lock() + self.assertEqual(fence, fence2) + await self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + await self.lock.unlock() + await self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = await self.lock.lock() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + # TODO: test_lock_when_locked_by_another_thread(self): + + async def test_try_lock_when_free(self): + self.assert_valid_fence(await self.lock.try_lock()) + await self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + async def test_try_lock_when_free_with_timeout(self): + self.assert_valid_fence(await self.lock.try_lock(1)) + await self.assert_lock(True, True, 1) + self.assert_session_acquire_count(1) + + async def test_try_lock_when_locked_by_self(self): + fence = await self.lock.lock() + self.assert_valid_fence(fence) + fence2 = await self.lock.try_lock() + self.assertEqual(fence, fence2) + await self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + await self.lock.unlock() + await self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = await self.lock.try_lock() + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + async def test_try_lock_when_locked_by_self_with_timeout(self): + fence = await self.lock.lock() + self.assert_valid_fence(fence) + fence2 = await self.lock.try_lock(2) + self.assertEqual(fence, fence2) + await self.assert_lock(True, True, 2) + self.assert_session_acquire_count(2) + await self.lock.unlock() + await self.lock.unlock() + self.assert_session_acquire_count(0) + fence3 = await self.lock.try_lock(2) + self.assert_valid_fence(fence3) + self.assertGreater(fence3, fence) + self.assert_session_acquire_count(1) + + # TODO: test_try_lock_when_locked_by_another_thread(self): + # TODO: test_try_lock_when_locked_by_another_thread_with_timeout(self): + + async def test_unlock_when_free(self): + try: + await self.lock.unlock() + except IllegalMonitorStateError: + pass + else: + self.fail("expected IllegalMonitorStateError to be raised") + + await self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + async def test_unlock_when_locked_by_self(self): + await self.lock.lock() + self.assertIsNone(await self.lock.unlock()) + + try: + await self.lock.unlock() + except IllegalMonitorStateError: + pass + else: + self.fail("expected IllegalMonitorStateError to be raised") + + await self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + async def test_unlock_multiple_times(self): + await self.lock.lock() + await self.lock.lock() + await self.lock.unlock() + await self.lock.unlock() + await self.assert_lock(False, False, 0) + self.assert_session_acquire_count(0) + + # TODO: test_unlock_when_locked_by_another_thread(self): + + async def test_is_locked_when_free(self): + self.assertFalse(await self.lock.is_locked()) + self.assert_session_acquire_count(0) + + async def test_is_locked_when_locked_by_self(self): + await self.lock.lock() + self.assertTrue(await self.lock.is_locked()) + await self.lock.unlock() + self.assertFalse(await self.lock.is_locked()) + self.assert_session_acquire_count(0) + + # TODO: test_is_locked_when_locked_by_another_thread(self): + + async def test_is_locked_by_current_thread_when_free(self): + self.assertFalse(await self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(0) + + async def test_is_locked_by_current_thread_when_locked_by_self(self): + await self.lock.lock() + self.assertTrue(await self.lock.is_locked_by_current_thread()) + await self.lock.unlock() + self.assertFalse(await self.lock.is_locked_by_current_thread()) + self.assert_session_acquire_count(0) + + # TODO: def test_is_locked_by_current_thread_when_locked_by_another_thread(self): + + async def assert_lock(self, is_locked, is_locked_by_current_thread, lock_count): + self.assertEqual(is_locked, await self.lock.is_locked()) + self.assertEqual(is_locked_by_current_thread, await self.lock.is_locked_by_current_thread()) + self.assertEqual(lock_count, await self.lock.get_lock_count()) + + def assert_valid_fence(self, fence): + self.assertNotEqual(FencedLock.INVALID_FENCE, fence) + + def assert_not_valid_fence(self, fence): + self.assertEqual(FencedLock.INVALID_FENCE, fence) + + def assert_session_acquire_count(self, expected_acquire_count): + session = self.client._proxy_session_manager._sessions.get(self.lock.get_group_id(), None) + if not session: + actual = 0 + else: + actual = session.acquire_count.get() + + self.assertEqual(self.initial_acquire_count + expected_acquire_count, actual) + + def get_initial_acquire_count(self): + session = self.client._proxy_session_manager._sessions.get(self.lock.get_group_id(), None) + if not session: + return 0 + return session.acquire_count.get()