diff --git a/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py b/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py index de018e5a3ce..ceee764c1b8 100644 --- a/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py +++ b/amber/src/main/python/core/architecture/sendsemantics/hash_based_shuffle_partitioner.py @@ -37,18 +37,8 @@ def __init__(self, partitioning: HashBasedShufflePartitioning): super().__init__(set_one_of(Partitioning, partitioning)) logger.debug(f"got {partitioning}") self.batch_size = partitioning.batch_size - # Partitioning contains an ordered list of downstream worker ids. - # Currently we are using the index of such an order to choose - # a downstream worker to send tuples to. - # Must use dict.fromkeys to ensure the order of receiver workers - # from partitioning is preserved (using `{}` to create a set - # does not preserve order and will not work correctly.) - self.receivers = [ - (rid, []) - for rid in dict.fromkeys( - channel.to_worker_id for channel in partitioning.channels - ) - ] + # Indexed by hash_code to choose the downstream worker to send to. + self.receivers = self.build_receiver_batches(partitioning.channels) self.hash_attribute_names = partitioning.hash_attribute_names @overrides diff --git a/amber/src/main/python/core/architecture/sendsemantics/partitioner.py b/amber/src/main/python/core/architecture/sendsemantics/partitioner.py index c4aac57cbe4..ed2332d5cba 100644 --- a/amber/src/main/python/core/architecture/sendsemantics/partitioner.py +++ b/amber/src/main/python/core/architecture/sendsemantics/partitioner.py @@ -32,6 +32,18 @@ class Partitioner(ABC): def __init__(self, partitioning: Message): self.partitioning: Partitioning = get_one_of(partitioning) + @staticmethod + def build_receiver_batches( + channels, + ) -> typing.List[typing.Tuple[ActorVirtualIdentity, typing.List[Tuple]]]: + # An ordered (receiver, batch) pair per distinct downstream worker. + # dict.fromkeys preserves the channel order; a set literal would not, + # which breaks input-port materialization reader threads. + return [ + (rid, []) + for rid in dict.fromkeys(channel.to_worker_id for channel in channels) + ] + def add_tuple_to_batch( self, tuple_: Tuple ) -> Iterator[typing.Tuple[ActorVirtualIdentity, typing.List[Tuple]]]: diff --git a/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py b/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py index 28aff35935f..3f8f83c504b 100644 --- a/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py +++ b/amber/src/main/python/core/architecture/sendsemantics/range_based_shuffle_partitioner.py @@ -37,18 +37,8 @@ def __init__(self, partitioning: RangeBasedShufflePartitioning): super().__init__(set_one_of(Partitioning, partitioning)) logger.info(f"got {partitioning}") self.batch_size = partitioning.batch_size - # Partitioning contains an ordered list of downstream worker ids. - # Currently we are using the index of such an order to choose - # a downstream worker to send tuples to. - # Must use dict.fromkeys to ensure the order of receiver workers - # from partitioning is preserved (using `{}` to create a set - # does not preserve order and will not work correctly.) - self.receivers = [ - (rid, []) - for rid in dict.fromkeys( - channel.to_worker_id for channel in partitioning.channels - ) - ] + # Indexed by get_receiver_index to choose the downstream worker to send to. + self.receivers = self.build_receiver_batches(partitioning.channels) self.range_attribute_names = partitioning.range_attribute_names self.range_min = partitioning.range_min self.range_max = partitioning.range_max diff --git a/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py b/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py index 87c3fee87d8..21c1a3ca0f4 100644 --- a/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py +++ b/amber/src/main/python/core/architecture/sendsemantics/round_robin_partitioner.py @@ -35,19 +35,8 @@ class RoundRobinPartitioner(Partitioner): def __init__(self, partitioning: RoundRobinPartitioning): super().__init__(set_one_of(Partitioning, partitioning)) self.batch_size = partitioning.batch_size - # Partitioning contains an ordered list of downstream worker ids. - # Currently we are using the index of such an order to choose - # a downstream worker to send tuples to. - # Must use dict.fromkeys to ensure the order of receiver workers - # from partitioning is preserved (using `{}` to create a set - # does not preserve order and will not work with input-port - # materialization reader threads.) - self.receivers = [ - (rid, []) - for rid in dict.fromkeys( - channel.to_worker_id for channel in partitioning.channels - ) - ] + # Indexed by round_robin_index to choose the downstream worker to send to. + self.receivers = self.build_receiver_batches(partitioning.channels) self.round_robin_index = 0 @overrides