Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.utils.byte_limited_queue import ByteLimitedQueue

if TYPE_CHECKING:
import apache_beam.coders.slow_stream
Expand Down Expand Up @@ -455,11 +456,14 @@ class _GrpcDataChannel(DataChannel):

def __init__(self, data_buffer_time_limit_ms=0):
# type: (int) -> None

self._data_buffer_time_limit_ms = data_buffer_time_limit_ms
self._to_send = queue.Queue() # type: queue.Queue[DataOrTimers]
self._to_send = ByteLimitedQueue(
maxsize=10000,
maxbytes=100 << 20) # type: ByteLimitedQueue[DataOrTimers]
self._received = collections.defaultdict(
lambda: queue.Queue(maxsize=5)
) # type: DefaultDict[str, queue.Queue[DataOrTimers]]
lambda: ByteLimitedQueue(maxsize=5, maxbytes=100 << 20)
) # type: DefaultDict[str, ByteLimitedQueue[DataOrTimers]]

# Keep a cache of completed instructions. Data for completed instructions
# must be discarded. See input_elements() and _clean_receiving_queue().
Expand All @@ -474,15 +478,15 @@ def __init__(self, data_buffer_time_limit_ms=0):

def close(self):
# type: () -> None
self._to_send.put(self._WRITES_FINISHED)
self._to_send.put(self._WRITES_FINISHED, 0)
self._closed = True

def wait(self, timeout=None):
# type: (Optional[int]) -> None
self._reads_finished.wait(timeout)

def _receiving_queue(self, instruction_id):
# type: (str) -> Optional[queue.Queue[DataOrTimers]]
# type: (str) -> Optional[ByteLimitedQueue[DataOrTimers]]

"""
Gets or creates queue for a instruction_id. Or, returns None if the
Expand Down Expand Up @@ -585,21 +589,19 @@ def output_stream(self, instruction_id, transform_id):
def add_to_send_queue(data):
# type: (bytes) -> None
if data:
self._to_send.put(
beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id,
transform_id=transform_id,
data=data))
elem = beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id, transform_id=transform_id, data=data)
self._to_send.put(elem, self._get_element_size_bytes(elem))

def close_callback(data):
# type: (bytes) -> None
add_to_send_queue(data)
# End of stream marker.
self._to_send.put(
beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id,
transform_id=transform_id,
is_last=True))
elem = beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id,
transform_id=transform_id,
is_last=True)
self._to_send.put(elem, self._get_element_size_bytes(elem))

return ClosableOutputStream.create(
close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
Expand All @@ -614,23 +616,23 @@ def output_timer_stream(
def add_to_send_queue(timer):
# type: (bytes) -> None
if timer:
self._to_send.put(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
timers=timer,
is_last=False))
elem = beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
timers=timer,
is_last=False)
self._to_send.put(elem, self._get_element_size_bytes(elem))

def close_callback(timer):
# type: (bytes) -> None
add_to_send_queue(timer)
self._to_send.put(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
is_last=True))
elem = beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
is_last=True)
self._to_send.put(elem, self._get_element_size_bytes(elem))

return ClosableOutputStream.create(
close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
Expand Down Expand Up @@ -665,6 +667,15 @@ def _write_outputs(self):
raise ValueError('Unexpected output element type %s' % type(stream))
yield beam_fn_api_pb2.Elements(data=data_stream, timers=timer_stream)

def _get_element_size_bytes(self, element):
# type: (Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timers]) -> int
if isinstance(element, beam_fn_api_pb2.Elements.Data):
return len(element.data)
elif isinstance(element, beam_fn_api_pb2.Elements.Timers):
return len(element.timers)
else:
return 0

def _read_inputs(self, elements_iterator):
# type: (Iterable[beam_fn_api_pb2.Elements]) -> None

Expand All @@ -691,7 +702,8 @@ def _put_queue(instruction_id, element):
next_discard_log_time = current_time + 10
return
try:
input_queue.put(element, timeout=1)
input_queue.put(
element, self._get_element_size_bytes(element), timeout=1)
return
except queue.Full:
current_time = time.time()
Expand Down
30 changes: 30 additions & 0 deletions sdks/python/apache_beam/utils/byte_limited_queue.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# cython: overflowcheck=True

cdef class ByteLimitedQueue(object):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add the .py counterpart to

extensions = cythonize([
for the cythonized extension to be built.

you can also test performance diffs with regular queue to make sure no surprises there.

cdef readonly Py_ssize_t max_elements
cdef readonly Py_ssize_t max_bytes
cdef readonly Py_ssize_t _byte_size
cdef readonly object _mutex
cdef readonly object _not_empty
cdef readonly object _waiting_writers
cdef readonly object _queue
cdef readonly Py_ssize_t _blocked_bytes

cpdef bint _can_fit(self, Py_ssize_t item_bytes) except -1
195 changes: 195 additions & 0 deletions sdks/python/apache_beam/utils/byte_limited_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A thread-safe queue that limits capacity by total byte size."""

import collections
import queue
import threading
import time
import types


class ByteLimitedQueue(object):
"""A fair queue that limits by both element count and total byte size.

A single element is allowed to exceed the maxbytes to avoid deadlock.
"""
__class_getitem__ = classmethod(types.GenericAlias)

def __init__(
self,
maxsize=0, # type: int
maxbytes=0, # type: int
):
# type: (...) -> None

"""Initializes a ByteLimitedQueue.

Args:
maxsize: The maximum number of items allowed in the queue. If 0 or
negative, there is no limit on the number of elements.
maxbytes: The maximum accumulated bytes allowed in the queue. If 0 or
negative, there is no limit on the total bytes of the elements.
"""
self.max_elements = maxsize
self.max_bytes = maxbytes
self._byte_size = 0
self._blocked_bytes = 0
self._mutex = threading.Lock()
self._not_empty = threading.Condition(self._mutex)
self._waiting_writers = collections.deque()
self._queue = collections.deque()

def put(self, item, item_bytes, *, block=True, timeout=None):
"""Put an item into the queue.

If the queue is full, block until a free slot is available, unless `block`
is false or a timeout occurs.

Args:
item: The item to put into the queue.
item_bytes: The size of the item.
block: If True, block until space is available. If False, raise queue.Full
immediately if the queue is full.
timeout: If block is True, wait for at most `timeout` seconds. If None,
block indefinitely.

Raises:
ValueError: If timeout or item_bytes is negative.
queue.Full: If the queue is full and block is False or the timeout occurs.
"""
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
if item_bytes < 0:
raise ValueError("'item_bytes' must be a non-negative number")

with self._mutex:
if not self._waiting_writers and self._can_fit(item_bytes):
self._queue.append((item, item_bytes))
self._byte_size += item_bytes
self._not_empty.notify()
return

if not block:
raise queue.Full

my_cond = threading.Condition(self._mutex)
endtime = time.monotonic() + timeout if timeout is not None else None
try:
self._blocked_bytes += item_bytes
self._waiting_writers.append(my_cond)
while True:
if timeout is None:
my_cond.wait()
else:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise queue.Full
my_cond.wait(remaining)

if self._waiting_writers[0] is my_cond and self._can_fit(item_bytes):
break

self._queue.append((item, item_bytes))
self._byte_size += item_bytes
self._not_empty.notify()
finally:
self._blocked_bytes -= item_bytes
if self._waiting_writers:
was_first = (self._waiting_writers[0] is my_cond)
if was_first:
self._waiting_writers.popleft()
else:
self._waiting_writers.remove(my_cond)
if was_first and self._waiting_writers:
self._waiting_writers[0].notify()

def get(self, *, block=True, timeout=None):
"""Remove and return an item from the queue.

If the queue is empty, block until an item is available, unless `block`
is false or a timeout occurs.

Args:
block: If True, block until an item is available. If False, raise
queue.Empty immediately if the queue is empty.
timeout: If block is True, wait for at most `timeout` seconds. If None,
block indefinitely.

Returns:
The item removed from the queue.

Raises:
ValueError: If timeout is negative.
queue.Empty: If the queue is empty and block is False or the timeout
occurs.
"""
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")

with self._mutex:
if not block:
if not self._queue:
raise queue.Empty
elif timeout is None:
while not self._queue:
self._not_empty.wait()
else:
endtime = time.monotonic() + timeout
Comment thread
tvalentyn marked this conversation as resolved.
while not self._queue:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise queue.Empty
self._not_empty.wait(remaining)

item, item_bytes = self._queue.popleft()
self._byte_size -= item_bytes

if self._waiting_writers:
self._waiting_writers[0].notify()

return item

def get_nowait(self):
"""Remove and return an item from the queue without blocking."""
return self.get(block=False)

def byte_size(self):
"""Return the total byte size of elements in the queue."""
with self._mutex:
return self._byte_size

def blocked_byte_size(self):
"""Return the total byte size of elements in the queue that are blocked."""
with self._mutex:
return self._blocked_bytes

def qsize(self):
"""Return the total number of elements in the queue."""
with self._mutex:
return len(self._queue)

def _can_fit(self, item_bytes):
# Always let in a single element, regardless of size.
if not self._queue:
return True
if self.max_elements > 0 and len(self._queue) >= self.max_elements:
return False
if self.max_bytes > 0 and self._byte_size + item_bytes > self.max_bytes:
return False
return True
Loading
Loading