@@ -17,6 +17,7 @@ class MessageBuffer:
1717 def __init__ (self , max_num_messages : int = MAX_MESSAGE_BUFFER_SIZE ):
1818 self .max_size = max_num_messages
1919 self .buffer : list [TransportMessage ] = []
20+ self ._has_messages = asyncio .Event ()
2021 self ._space_available_cond = asyncio .Condition ()
2122 self ._closed = False
2223
@@ -35,6 +36,7 @@ def put(self, message: TransportMessage) -> None:
3536 if self ._closed :
3637 raise MessageBufferClosedError ("message buffer is closed" )
3738 self .buffer .append (message )
39+ self ._has_messages .set ()
3840
3941 def get_next_sent_seq (self ) -> int | None :
4042 if self .buffer :
@@ -47,16 +49,25 @@ def peek(self) -> TransportMessage | None:
4749 return None
4850 return self .buffer [0 ]
4951
50- def remove_old_messages (self , min_seq : int ) -> None :
52+ async def remove_old_messages (self , min_seq : int ) -> None :
5153 """Remove messages in the buffer with a seq number less than min_seq."""
5254 self .buffer = [msg for msg in self .buffer if msg .seq >= min_seq ]
55+ if self .buffer :
56+ self ._has_messages .set ()
57+ else :
58+ self ._has_messages .clear ()
5359 async with self ._space_available_cond :
5460 self ._space_available_cond .notify_all ()
5561
56- def close (self ) -> None :
62+ async def block_until_message_available (self ) -> None :
63+ """Allow consumers to avoid spinning unnecessarily"""
64+ await self ._has_messages .wait ()
65+
66+ async def close (self ) -> None :
5767 """
5868 Closes the message buffer and rejects any pending put operations.
5969 """
6070 self ._closed = True
71+ self ._has_messages .set ()
6172 async with self ._space_available_cond :
6273 self ._space_available_cond .notify_all ()
0 commit comments