Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions differential-dataflow/examples/columnar_spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use differential_dataflow::columnar::spill::{Entry, Fetch, Spill, SpillPolicy};
use differential_dataflow::columnar::updates::{Updates, UpdatesTyped};
use differential_dataflow::logging::Logger;
use differential_dataflow::operators::arrange::arrangement::arrange_core;
use differential_dataflow::trace::{Batcher, Builder};
use differential_dataflow::trace::{Batcher, Description};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::probe::{Handle as ProbeHandle, Probe};
use timely::dataflow::operators::Input;
Expand Down Expand Up @@ -343,11 +343,8 @@ where
Self(inner)
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
&mut self,
upper: Antichain<T>,
) -> B::Output {
self.0.seal::<B>(upper)
fn seal(&mut self, upper: Antichain<T>) -> (Vec<Self::Output>, Description<T>) {
self.0.seal(upper)
}

fn frontier(&mut self) -> AntichainRef<'_, T> {
Expand Down
7 changes: 3 additions & 4 deletions differential-dataflow/src/columnar/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use timely::progress::{frontier::Antichain, Timestamp};
use timely::container::PushInto;

use crate::logging::Logger;
use crate::trace::{Batcher, Builder, Description};
use crate::trace::{Batcher, Description};

use super::layout::ColumnarUpdate as Update;
use super::updates::UpdatesTyped;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<U::Time>) -> B::Output {
fn seal(&mut self, upper: Antichain<U::Time>) -> (Vec<Self::Output>, Description<U::Time>) {
// Merge all remaining chains into a single chain.
while self.chains.len() > 1 {
let list1 = self.chains.pop().unwrap();
Expand Down Expand Up @@ -93,9 +93,8 @@ impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
}

let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(U::Time::minimum()));
let seal = B::seal(&mut readied, description);
self.lower = upper;
seal
(readied, description)
}

/// The frontier of elements remaining after the most recent call to `self.seal`.
Expand Down
9 changes: 6 additions & 3 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
while let Some(key) = cursor.get_key(batch) {
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {

Check warning on line 199 in differential-dataflow/src/operators/arrange/arrangement.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
});
}
Expand Down Expand Up @@ -457,7 +457,8 @@
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal::<Bu>(upper.clone());
let (mut chain, description) = batcher.seal(upper.clone());
let batch = Bu::seal(&mut chain, description);

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand All @@ -484,8 +485,10 @@
capabilities = new_capabilities;
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal::<Bu>(frontier.frontier().to_owned());
// Announce progress updates, even without data. We seal the batcher to
// advance its lower bound and frontier, but discard the readied updates
// rather than building a batch we would immediately drop.
let _ = batcher.seal(frontier.frontier().to_owned());
writer.seal(frontier.frontier().to_owned());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use timely::progress::{frontier::Antichain, Timestamp};
use timely::container::PushInto;

use crate::logging::{BatcherEvent, Logger};
use crate::trace::{Batcher, Builder, Description};
use crate::trace::{Batcher, Description};

/// Creates batches from chunks of sorted, consolidated tuples.
pub struct MergeBatcher<M: Merger> {
Expand Down Expand Up @@ -58,7 +58,7 @@ where
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
fn seal(&mut self, upper: Antichain<M::Time>) -> (Vec<Self::Output>, Description<M::Time>) {
// Merge all remaining chains into a single chain.
while self.chains.len() > 1 {
let list1 = self.chain_pop().unwrap();
Expand All @@ -82,9 +82,8 @@ where
self.stash.clear();

let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
let seal = B::seal(&mut readied, description);
self.lower = upper;
seal
(readied, description)
}

/// The frontier of elements remaining after the most recent call to `self.seal`.
Expand Down
7 changes: 5 additions & 2 deletions differential-dataflow/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,11 @@ pub trait Batcher: PushInto<Self::Output> {
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
/// Returns all updates not greater or equal to an element of `upper`, as a sorted and
/// consolidated chain together with the description that bounds them.
///
/// The returned chain is suitable to hand directly to [`Builder::seal`].
fn seal(&mut self, upper: Antichain<Self::Time>) -> (Vec<Self::Output>, Description<Self::Time>);
/// Returns the lower envelope of contained update times.
fn frontier(&mut self) -> AntichainRef<'_, Self::Time>;
}
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/tests/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use timely::dataflow::operators::generic::OperatorInfo;
use timely::progress::{Antichain, frontier::AntichainRef};

use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine};
use differential_dataflow::trace::{Trace, TraceReader, Batcher};
use differential_dataflow::trace::{Trace, TraceReader, Batcher, Builder};
use differential_dataflow::trace::cursor::Cursor;

type IntegerTrace = ValSpine<u64, u64, usize, i64>;
Expand All @@ -22,9 +22,9 @@ fn get_trace() -> ValSpine<u64, u64, usize, i64> {
]);

let batch_ts = &[1, 2, 3];
let batches = batch_ts.iter().map(move |i| batcher.seal::<IntegerBuilder>(Antichain::from_elem(*i)));
for b in batches {
trace.insert(b);
for i in batch_ts {
let (mut chain, description) = batcher.seal(Antichain::from_elem(*i));
trace.insert(IntegerBuilder::seal(&mut chain, description));
}
}
trace
Expand Down
Loading