Skip to content

Commit 7fdff86

Browse files
committed
new parameter to compute transfer hashes
1 parent 71a16ed commit 7fdff86

10 files changed

Lines changed: 64 additions & 12 deletions

File tree

src/aux/file/receive.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use fasthash::HasherExt;
2-
31
use crate::aux::{self, file};
2+
use fasthash::HasherExt;
43
use std::{
54
fs,
65
hash::Hash,
6+
hash::Hasher,
77
io::{Read, Write},
88
net,
99
os::unix::{self, fs::PermissionsExt},
@@ -156,7 +156,7 @@ where
156156
0 => {
157157
if 0 < cursor {
158158
if let Some(hasher) = hasher.as_mut() {
159-
buffer[..cursor].hash(hasher);
159+
hasher.write(&buffer[..cursor]);
160160
}
161161
file.write_all(&buffer[..cursor])?;
162162
}

src/aux/file/send.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
use fasthash::HasherExt;
2-
31
use crate::aux::{self, file};
2+
use fasthash::HasherExt;
43
use std::{
54
fs,
6-
hash::Hash,
5+
hash::{Hash, Hasher},
76
io::{Read, Write},
87
net,
98
os::unix::{self, fs::PermissionsExt},
@@ -108,7 +107,7 @@ where
108107
if 0 < cursor {
109108
total += cursor;
110109
if let Some(hasher) = hasher.as_mut() {
111-
buffer[..cursor].hash(hasher);
110+
hasher.write(&buffer[..cursor]);
112111
}
113112
diode.write_all(&buffer[..cursor])?;
114113
}

src/bin/diode-oneshot-receive.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ struct Args {
8383
min_repair: u32,
8484
#[clap(long, help = "Set CPU affinity for threads")]
8585
cpu_affinity: bool,
86+
#[clap(long, help = "Hash each client transfered data")]
87+
hash: bool,
8688
}
8789

8890
fn main() {
@@ -120,6 +122,7 @@ fn main() {
120122
heartbeat_interval: None,
121123
batch_receive: args.batch,
122124
cpu_affinity: args.cpu_affinity,
125+
hash: args.hash,
123126
},
124127
raptorq,
125128
|_| Ok::<_, io::Error>(io::stdout()),

src/bin/diode-oneshot-send.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ struct Args {
6565
repair: u32,
6666
#[clap(long, help = "Set CPU affinity for threads")]
6767
cpu_affinity: bool,
68+
#[clap(long, help = "Hash each client transfered data")]
69+
hash: bool,
6870
}
6971

7072
fn main() {
@@ -100,6 +102,7 @@ fn main() {
100102
to_mtu: args.to_mtu,
101103
batch_send: args.batch,
102104
cpu_affinity: args.cpu_affinity,
105+
hash: args.hash,
103106
},
104107
raptorq,
105108
) {

src/bin/diode-receive.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ struct Args {
127127
heartbeat: Option<time::Duration>,
128128
#[clap(long, help = "Set CPU affinity for threads")]
129129
cpu_affinity: bool,
130+
#[clap(long, help = "Hash each client transfered data")]
131+
hash: bool,
130132
}
131133

132134
enum Client {
@@ -210,6 +212,7 @@ fn main() {
210212
heartbeat_interval: args.heartbeat,
211213
batch_receive: args.batch,
212214
cpu_affinity: args.cpu_affinity,
215+
hash: args.hash,
213216
},
214217
raptorq,
215218
|_| Client::try_from(&args.to),

src/bin/diode-send.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ struct Args {
115115
repair: u32,
116116
#[clap(long, help = "Set CPU affinity for threads")]
117117
cpu_affinity: bool,
118+
#[clap(long, help = "Hash each client transfered data")]
119+
hash: bool,
118120
}
119121

120122
enum Client {
@@ -205,6 +207,7 @@ fn main() {
205207
to_mtu: args.to_mtu,
206208
batch_send: args.batch,
207209
cpu_affinity: args.cpu_affinity,
210+
hash: args.hash,
208211
},
209212
raptorq,
210213
) {

src/receive/client.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
//! Worker that writes decoded and reordered messages to client
22
33
use crate::{protocol, receive};
4+
use fasthash::HasherExt;
45
use std::{
6+
hash::Hasher,
57
io::{self, Write},
68
os::fd::AsRawFd,
79
thread,
@@ -26,6 +28,12 @@ where
2628

2729
let mut transmitted = 0;
2830

31+
let mut hasher = if receiver.config.hash {
32+
Some(fasthash::SpookyHasherExt::default())
33+
} else {
34+
None
35+
};
36+
2937
loop {
3038
let block = if let Some(timeout) = receiver.config.abort_timeout {
3139
recvq.recv_timeout(timeout).map_err(receive::Error::from)?
@@ -39,7 +47,13 @@ where
3947

4048
if !payload.is_empty() {
4149
log::trace!("client {client_id:x}: payload {} bytes", payload.len());
50+
51+
if let Some(hasher) = hasher.as_mut() {
52+
hasher.write(payload);
53+
}
54+
4255
transmitted += payload.len();
56+
4357
client.write_all(payload)?;
4458
if receiver.config.flush {
4559
client.flush()?;
@@ -58,9 +72,16 @@ where
5872
return Ok(());
5973
}
6074
protocol::BlockType::End => {
61-
log::info!(
62-
"client {client_id:x}: finished transfer, {transmitted} bytes transmitted"
63-
);
75+
if let Some(hasher) = hasher {
76+
let hash = hasher.finish_ext();
77+
log::info!(
78+
"client {client_id:x}: finished transfer, {transmitted} bytes transmitted, hash is {hash:x}"
79+
);
80+
} else {
81+
log::info!(
82+
"client {client_id:x}: finished transfer, {transmitted} bytes transmitted"
83+
);
84+
}
6485
client.flush()?;
6586
(receiver.client_end)(
6687
client.into_inner().map_err(|e| {

src/receive/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub struct Config {
4343
pub abort_timeout: Option<time::Duration>,
4444
pub heartbeat_interval: Option<time::Duration>,
4545
pub cpu_affinity: bool,
46+
pub hash: bool,
4647
}
4748

4849
pub enum Error {

src/send/client.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//! Worker that reads data from a client socket and split it into [`crate::protocol`] blocks
22
33
use crate::{protocol, send};
4-
use std::{io, os::fd::AsRawFd, thread};
4+
use fasthash::HasherExt;
5+
use std::{hash::Hasher, io, os::fd::AsRawFd, thread};
56

67
pub fn start<C>(
78
sender: &send::Sender<C>,
@@ -24,6 +25,12 @@ where
2425
let mut cursor = 0;
2526
let mut transmitted = 0;
2627

28+
let mut hasher = if sender.config.hash {
29+
Some(fasthash::SpookyHasherExt::default())
30+
} else {
31+
None
32+
};
33+
2734
loop {
2835
log::trace!("client {client_id:x}: read...");
2936

@@ -46,6 +53,10 @@ where
4653

4754
log::trace!("client {client_id:x}: send {cursor} bytes");
4855

56+
if let Some(hasher) = hasher.as_mut() {
57+
hasher.write(&buffer[..cursor]);
58+
}
59+
4960
sender.to_encoding.send(Some(protocol::Block::new(
5061
block_type,
5162
&sender.raptorq,
@@ -57,7 +68,14 @@ where
5768
cursor = 0;
5869

5970
if 0 == read {
60-
log::info!("client {client_id:x}: disconnect, {transmitted} bytes sent");
71+
if let Some(hasher) = hasher {
72+
let hash = hasher.finish_ext();
73+
log::info!(
74+
"client {client_id:x}: disconnect, {transmitted} bytes sent, hash is {hash:x}"
75+
);
76+
} else {
77+
log::info!("client {client_id:x}: disconnect, {transmitted} bytes sent");
78+
}
6179
return Ok(());
6280
}
6381

src/send/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct Config {
4444
pub to_mtu: u16,
4545
pub batch_send: Option<u32>,
4646
pub cpu_affinity: bool,
47+
pub hash: bool,
4748
}
4849

4950
pub enum Error {

0 commit comments

Comments
 (0)