-
Notifications
You must be signed in to change notification settings - Fork 762
fix: Add is_finished and correct is_empty semantics in RequestQueueClient
#1982
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
d8ddc93
59f6e9b
6433c37
7760d62
5361919
e913621
d69fff2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -595,10 +595,34 @@ async def reclaim_request( | |
| @retry_on_error(SQLAlchemyError) | ||
| @override | ||
| async def is_empty(self) -> bool: | ||
| # Check in-memory cache for requests | ||
| # Requests buffered for fetching mean the queue is not empty. | ||
| if self._pending_fetch_cache: | ||
| return False | ||
|
|
||
| now = datetime.now(timezone.utc) | ||
|
|
||
| # Check if there are any unhandled requests that are not blocked. | ||
| async with self.get_session(with_simple_commit=True) as session: | ||
| stmt = select( | ||
| exists().where( | ||
| self._ITEM_TABLE.request_queue_id == self._id, | ||
| self._ITEM_TABLE.is_handled == False, # noqa: E712 | ||
| or_(self._ITEM_TABLE.time_blocked_until.is_(None), self._ITEM_TABLE.time_blocked_until < now), | ||
| ) | ||
| ) | ||
| result = await session.execute(stmt) | ||
|
|
||
| await self._add_buffer_record(session) | ||
|
|
||
| return not result.scalar() | ||
|
|
||
| @retry_on_error(SQLAlchemyError) | ||
| @override | ||
| async def is_finished(self) -> bool: | ||
| # If the queue is not empty, it is not finished | ||
| if not await self.is_empty(): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Maybe we could handle the empty check directly inside
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Getting a session from the pool is quite cheap, but it can actually impact performance if the pool is small. However, this will only become apparent when the queue is empty but not yet finished. But merging If we need to optimize these methods, I would consider removing the |
||
| return False | ||
|
|
||
| metadata = await self.get_metadata() | ||
|
|
||
| async with self.get_session(with_simple_commit=True) as session: | ||
|
|
@@ -629,7 +653,8 @@ async def is_empty(self) -> bool: | |
| has_pending_buffer_updates = buffer_result.scalar() | ||
|
|
||
| await self._add_buffer_record(session) | ||
| # If there are no pending requests and no buffered updates, the queue is empty | ||
|
|
||
| # If there are no pending requests and no buffered updates, the queue is finished | ||
| return not has_pending_buffer_updates | ||
|
|
||
| # There are pending requests (may be inaccurate), ensure recalculated metadata | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.