Skip to content

Commit e7ee7c0

Browse files
committed
Chunk abstraction for all roles
1 parent 5c9f72d commit e7ee7c0

3 files changed

Lines changed: 2532 additions & 0 deletions

File tree

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//! Minimal dataflow over the `Vec`-backed `Chunk` container.
2+
//!
3+
//! Mirrors the `val` arm of `spines.rs`, but arranges through `ChunkBatcher` /
4+
//! `ChunkRcBuilder` / `ChunkSpine` — i.e. the merge batcher, builder, and spine
5+
//! built atop the `Chunk` trait and its `ChunkBatch`. Run as:
6+
//!
7+
//! ```text
8+
//! cargo run --release --example chunks -- <keys> <size>
9+
//! ```
10+
11+
use differential_dataflow::Hashable;
12+
use differential_dataflow::input::Input;
13+
use differential_dataflow::operators::arrange::Arrange;
14+
use differential_dataflow::operators::arrange::arrangement::arrange_core;
15+
use differential_dataflow::trace::chunk::vec_chunk::{ChunkBatcher, ChunkRcBuilder, ChunkSpine, VecChunk};
16+
use differential_dataflow::trace::chunk::col_chunk::{ColChunkBatcher, ColChunkRcBuilder, ColChunkSpine, ColChunker};
17+
use differential_dataflow::trace::implementations::Vector;
18+
use differential_dataflow::trace::implementations::chunker::ContainerChunker;
19+
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine};
20+
21+
use timely::dataflow::channels::pact::Exchange;
22+
use timely::dataflow::operators::probe::Handle;
23+
24+
fn main() {
25+
let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
26+
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();
27+
// "chunk" (default): our `Chunk`-backed trace. "ord": the standard `ord_neu` trace.
28+
let mode: String = std::env::args().nth(3).unwrap_or_else(|| "chunk".to_string());
29+
println!("Running [{mode}] arrangement");
30+
31+
let timer = std::time::Instant::now();
32+
33+
// Skip the three positional args we consume (keys, size, mode); the rest are
34+
// timely's worker flags.
35+
timely::execute_from_args(std::env::args().skip(4), move |worker| {
36+
let mut probe = Handle::new();
37+
let (mut data_input, mut keys_input) = worker.dataflow(|scope| {
38+
let (data_input, data) = scope.new_collection::<u64, isize>();
39+
let (keys_input, keys) = scope.new_collection::<u64, isize>();
40+
let data = data.map(|x| (x, ()));
41+
let keys = keys.map(|x| (x, ()));
42+
43+
match mode.as_str() {
44+
"chunk" => {
45+
// The chunk batcher's output (`VecChunk`) differs from the stream
46+
// container (`Vec`), so this is a cross-container chunker case:
47+
// drop to `arrange_core` with an explicit `ContainerChunker<VecChunk>`.
48+
type Ba = ChunkBatcher<u64, (), u64, isize>;
49+
type Bu = ChunkRcBuilder<u64, (), u64, isize>;
50+
type Sp = ChunkSpine<u64, (), u64, isize>;
51+
type Chu = ContainerChunker<VecChunk<u64, (), u64, isize>>;
52+
let data = arrange_core::<_, _, Chu, Ba, Bu, Sp>(
53+
data.inner, Exchange::new(|u: &((u64, ()), u64, isize)| (u.0).0.hashed().into()), "Data");
54+
let keys = arrange_core::<_, _, Chu, Ba, Bu, Sp>(
55+
keys.inner, Exchange::new(|u: &((u64, ()), u64, isize)| (u.0).0.hashed().into()), "Keys");
56+
keys.join_core(data, |_k, &(), &()| Option::<()>::None).probe_with(&mut probe);
57+
}
58+
"colchunk" => {
59+
type L = Vector<((u64, ()), u64, isize)>;
60+
type Ba = ColChunkBatcher<L>;
61+
type Bu = ColChunkRcBuilder<L>;
62+
type Sp = ColChunkSpine<L>;
63+
type Chu = ColChunker<L>;
64+
let data = arrange_core::<_, _, Chu, Ba, Bu, Sp>(
65+
data.inner, Exchange::new(|u: &((u64, ()), u64, isize)| (u.0).0.hashed().into()), "Data");
66+
let keys = arrange_core::<_, _, Chu, Ba, Bu, Sp>(
67+
keys.inner, Exchange::new(|u: &((u64, ()), u64, isize)| (u.0).0.hashed().into()), "Keys");
68+
keys.join_core(data, |_k, &(), &()| Option::<()>::None).probe_with(&mut probe);
69+
}
70+
"ord" => {
71+
type Ba = OrdValBatcher<u64, (), u64, isize>;
72+
type Bu = RcOrdValBuilder<u64, (), u64, isize>;
73+
type Sp = OrdValSpine<u64, (), u64, isize>;
74+
let data = data.arrange::<Ba, Bu, Sp>();
75+
let keys = keys.arrange::<Ba, Bu, Sp>();
76+
keys.join_core(data, |_k, &(), &()| Option::<()>::None).probe_with(&mut probe);
77+
}
78+
other => panic!("unrecognized mode: {other:?} (expected `chunk`, `colchunk`, or `ord`)"),
79+
}
80+
81+
(data_input, keys_input)
82+
});
83+
84+
// Load `data`, advancing round by round.
85+
let mut counter = 0;
86+
let mut t: u64 = 1;
87+
while counter < 10 * keys {
88+
let mut i = worker.index();
89+
while i < size {
90+
data_input.insert(((counter + i) % keys) as u64);
91+
i += worker.peers();
92+
}
93+
counter += size;
94+
data_input.advance_to(t); data_input.flush();
95+
keys_input.advance_to(t); keys_input.flush();
96+
while probe.less_than(data_input.time()) { worker.step(); }
97+
t += 1;
98+
}
99+
println!("{:?}\tloading complete", timer.elapsed());
100+
101+
// Issue `keys` queries against the arranged `data`.
102+
let mut queries = 0;
103+
while queries < 10 * keys {
104+
let mut i = worker.index();
105+
while i < size {
106+
keys_input.insert(((queries + i) % keys) as u64);
107+
i += worker.peers();
108+
}
109+
queries += size;
110+
data_input.advance_to(t); data_input.flush();
111+
keys_input.advance_to(t); keys_input.flush();
112+
while probe.less_than(keys_input.time()) { worker.step(); }
113+
t += 1;
114+
}
115+
println!("{:?}\tqueries complete", timer.elapsed());
116+
}).unwrap();
117+
118+
println!("{:?}\tshut down", timer.elapsed());
119+
}

0 commit comments

Comments
 (0)