Skip to content

Commit 69196d2

Browse files
committed
send: minor improvements
1 parent 1cc614c commit 69196d2

5 files changed

Lines changed: 80 additions & 93 deletions

File tree

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ mod sock_utils;
2626
#[allow(unsafe_code)]
2727
mod udp;
2828

29+
pub const MAX_MTU: u16 = 9000;
30+
2931
#[derive(Clone, Copy, clap::ValueEnum)]
3032
pub enum RecvMode {
3133
#[cfg(feature = "receive-native")]

src/protocol.rs

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -257,45 +257,27 @@ impl Block {
257257
client_id: ClientId,
258258
data: Option<&[u8]>,
259259
) -> Result<Self, Error> {
260-
match data {
261-
None => {
262-
let mut content = vec![
263-
0u8;
264-
usize::try_from(raptorq.transfer_length).map_err(|e| {
265-
Error::Other(format!("transfer_length: {e}"))
266-
})?
267-
];
268-
let bytes = client_id.to_le_bytes();
269-
content[0] = bytes[0];
270-
content[1] = bytes[1];
271-
content[2] = bytes[2];
272-
content[3] = bytes[3];
273-
content[4] = block.serialized();
274-
Ok(Self(content))
275-
}
276-
Some(data) => {
277-
let mut content = Vec::with_capacity(
278-
usize::try_from(raptorq.transfer_length)
279-
.map_err(|e| Error::Other(format!("transfer_length: {e}")))?,
280-
);
281-
content.extend_from_slice(&client_id.to_le_bytes());
282-
content.push(block.serialized());
283-
content.extend_from_slice(&u32::to_le_bytes(
284-
u32::try_from(data.len())
285-
.map_err(|e| Error::Other(format!("data.len(): {e}")))?,
286-
));
287-
content.extend_from_slice(data);
288-
if content.len() < content.capacity() {
289-
content.resize(content.capacity(), 0);
290-
}
291-
Ok(Self(content))
292-
}
260+
let mut content = vec![
261+
0u8;
262+
usize::try_from(raptorq.transfer_length)
263+
.map_err(|e| Error::Other(format!("transfer_length: {e}")))?
264+
];
265+
content[0..4].copy_from_slice(&client_id.to_le_bytes());
266+
content[4] = block.serialized();
267+
268+
if let Some(data) = data {
269+
let data_len = data.len();
270+
content[5..9].copy_from_slice(&u32::to_le_bytes(
271+
u32::try_from(data_len).map_err(|e| Error::Other(format!("data.len(): {e}")))?,
272+
));
273+
content[9..9 + data_len].copy_from_slice(data);
293274
}
275+
276+
Ok(Self(content))
294277
}
295278

296279
pub(crate) fn client_id(&self) -> ClientId {
297-
let bytes = [self.0[0], self.0[1], self.0[2], self.0[3]];
298-
u32::from_le_bytes(bytes)
280+
u32::from_le_bytes([self.0[0], self.0[1], self.0[2], self.0[3]])
299281
}
300282

301283
pub(crate) fn block_type(&self) -> Result<BlockType, Error> {
@@ -310,8 +292,7 @@ impl Block {
310292
}
311293

312294
fn payload_len(&self) -> u32 {
313-
let data_len_bytes = [self.0[5], self.0[6], self.0[7], self.0[8]];
314-
u32::from_le_bytes(data_len_bytes)
295+
u32::from_le_bytes([self.0[5], self.0[6], self.0[7], self.0[8]])
315296
}
316297

317298
pub(crate) const fn deserialize(data: Vec<u8>) -> Self {

src/receive/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ where
187187
let multiplex_control = semka::Sem::new(config.max_clients)
188188
.ok_or(Error::Other("failed to create semaphore".into()))?;
189189

190+
if config.from_mtu > crate::MAX_MTU {
191+
return Err(Error::Other(format!("mtu {} is too large (> {})", config.from_mtu, crate::MAX_MTU)));
192+
}
193+
190194
let block_to_dispatch = (sync::Mutex::new(0), sync::Condvar::new());
191195

192196
let (to_reblock, for_reblock) = crossbeam_channel::unbounded();

src/send/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ where
141141
let multiplex_control = semka::Sem::new(config.max_clients)
142142
.ok_or(Error::Other("failed to create semaphore".into()))?;
143143

144+
if config.to_mtu > crate::MAX_MTU {
145+
return Err(Error::Other(format!("mtu {} is too large (> {})", config.to_mtu, crate::MAX_MTU)));
146+
}
147+
144148
let (to_server, for_server) = crossbeam_channel::bounded(1);
145149
let (to_ordering, for_ordering) = crossbeam_channel::bounded(config.to_ports.len());
146150
let (to_udp, for_udp) = crossbeam_channel::bounded(config.to_ports.len());

src/udp.rs

Lines changed: 52 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,16 @@ pub struct ReceiveMmsg<'a> {
122122
#[cfg(feature = "receive-mmsg")]
123123
impl ReceiveMmsg<'_> {
124124
fn new(socket: i32, udp_packet_size: u16) -> Self {
125-
let batch_size = MAX_BATCH_SIZE as usize;
126-
let iovecs = vec![unsafe { mem::zeroed::<libc::iovec>() }; batch_size];
125+
let iovecs = vec![unsafe { mem::zeroed::<libc::iovec>() }; MAX_BATCH_SIZE as usize];
127126
let mut iovecs = pin::Pin::new(iovecs);
128127

129-
let mut mmsghdr = vec![unsafe { mem::zeroed::<libc::mmsghdr>() }; batch_size];
130-
for i in 0..batch_size {
128+
let mut mmsghdr = vec![unsafe { mem::zeroed::<libc::mmsghdr>() }; MAX_BATCH_SIZE as usize];
129+
for i in 0..MAX_BATCH_SIZE as usize {
131130
mmsghdr[i].msg_hdr.msg_iov = &raw mut iovecs[i];
132131
mmsghdr[i].msg_hdr.msg_iovlen = 1;
133132
}
134133

135-
let mut buffers = vec![pin::Pin::new(vec![0u8; udp_packet_size as usize]); batch_size];
134+
let mut buffers = vec![pin::Pin::new(vec![0u8; udp_packet_size as usize]); MAX_BATCH_SIZE as usize];
136135

137136
for (i, buffer) in buffers.iter_mut().enumerate() {
138137
iovecs[i].iov_base = buffer.as_mut_ptr().cast::<libc::c_void>();
@@ -339,13 +338,12 @@ impl<'a> Send<'a> {
339338
io::Error::new(io::ErrorKind::InvalidData, format!("dest_len: {e}"))
340339
})?;
341340

342-
let batch_size = MAX_BATCH_SIZE as usize;
343-
let iovecs = vec![unsafe { mem::zeroed::<libc::iovec>() }; batch_size];
341+
let iovecs = vec![unsafe { mem::zeroed::<libc::iovec>() }; MAX_BATCH_SIZE as usize];
344342
let mut iovecs = pin::Pin::new(iovecs);
345343

346-
let mut mmsghdr = vec![unsafe { mem::zeroed::<libc::mmsghdr>() }; batch_size];
344+
let mut mmsghdr = vec![unsafe { mem::zeroed::<libc::mmsghdr>() }; MAX_BATCH_SIZE as usize];
347345

348-
for i in 0..batch_size {
346+
for i in 0..MAX_BATCH_SIZE as usize {
349347
mmsghdr[i].msg_hdr.msg_name = raw_dest.cast::<libc::c_void>();
350348
mmsghdr[i].msg_hdr.msg_namelen = dest_len;
351349
mmsghdr[i].msg_hdr.msg_iov = &raw mut iovecs[i];
@@ -364,72 +362,69 @@ impl<'a> Send<'a> {
364362
}
365363

366364
pub fn send(&mut self, packets: &[raptorq::EncodingPacket]) -> Result<(), io::Error> {
367-
let mut datagrams = packets.iter().map(raptorq::EncodingPacket::serialize);
365+
let datagrams = packets.iter().map(raptorq::EncodingPacket::serialize);
368366

369367
match self {
370368
#[cfg(feature = "send-native")]
371-
Self::Native { socket, dest } => datagrams.try_for_each(|datagram| {
372-
let len = datagram.len();
369+
Self::Native { socket, dest } => {
370+
for datagram in datagrams {
371+
let len = datagram.len();
373372

374-
let sent = socket.send_to(&datagram, *dest)?;
373+
let sent = socket.send_to(&datagram, *dest)?;
375374

376-
if sent == len {
377-
Ok(())
378-
} else {
379-
Err(io::Error::other(format!(
380-
"libc::sendmsg failed {sent} != {len}"
381-
)))
375+
if sent != len {
376+
return Err(io::Error::other(format!(
377+
"libc::sendmsg failed {sent} != {len}"
378+
)));
379+
}
382380
}
383-
}),
381+
}
384382
#[cfg(feature = "send-msg")]
385383
Self::Msg {
386384
socket,
387385
msghdr,
388386
iovec,
389387
..
390-
} => datagrams.try_for_each(|mut datagram| {
391-
let len = datagram.len();
388+
} => {
389+
for mut datagram in datagrams {
390+
let len = datagram.len();
392391

393-
iovec.iov_base = datagram.as_mut_ptr().cast();
394-
iovec.iov_len = len;
392+
iovec.iov_base = datagram.as_mut_ptr().cast();
393+
iovec.iov_len = len;
395394

396-
let sent = unsafe { libc::sendmsg(*socket, msghdr, 0) };
395+
let sent = unsafe { libc::sendmsg(*socket, msghdr, 0) };
397396

398-
if sent == len.cast_signed() {
399-
Ok(())
400-
} else {
401-
Err(io::Error::other(format!(
402-
"libc::sendmsg failed {sent} != {len}"
403-
)))
397+
if sent != len.cast_signed() {
398+
return Err(io::Error::other(format!(
399+
"libc::sendmsg failed {sent} != {len}"
400+
)));
401+
}
404402
}
405-
}),
403+
}
406404
#[cfg(feature = "send-mmsg")]
407405
Self::Mmsg {
408406
socket,
409407
mmsghdr,
410408
iovecs,
411409
..
412-
} => datagrams
413-
.collect::<Vec<_>>()
414-
.chunks_mut(MAX_BATCH_SIZE as usize)
415-
.try_for_each(|datagrams| {
410+
} => {
411+
for datagrams in datagrams
412+
.collect::<Vec<_>>()
413+
.chunks_mut(MAX_BATCH_SIZE as usize)
414+
{
416415
let to_send = datagrams.len();
417416

418-
datagrams
419-
.iter_mut()
420-
.enumerate()
421-
.try_for_each(|(i, datagram)| {
422-
mmsghdr[i].msg_len = u32::try_from(datagram.len())?;
423-
iovecs[i].iov_base = datagram.as_mut_ptr().cast::<libc::c_void>();
424-
iovecs[i].iov_len = datagram.len();
425-
Ok(())
426-
})
427-
.map_err(|e: num::TryFromIntError| {
428-
io::Error::new(
429-
io::ErrorKind::InvalidData,
430-
format!("datagram.len(): {e}"),
431-
)
432-
})?;
417+
for (i, datagram) in datagrams.iter_mut().enumerate() {
418+
mmsghdr[i].msg_len =
419+
u32::try_from(datagram.len()).map_err(|e: num::TryFromIntError| {
420+
io::Error::new(
421+
io::ErrorKind::InvalidData,
422+
format!("datagram.len(): {e}"),
423+
)
424+
})?;
425+
iovecs[i].iov_base = datagram.as_mut_ptr().cast::<libc::c_void>();
426+
iovecs[i].iov_len = datagram.len();
427+
}
433428

434429
let sent = unsafe {
435430
libc::sendmmsg(
@@ -442,12 +437,13 @@ impl<'a> Send<'a> {
442437
) as isize
443438
};
444439

445-
if sent.cast_unsigned() == to_send {
446-
Ok(())
447-
} else {
448-
Err(io::Error::other("libc::sendmmsg"))
440+
if sent.cast_unsigned() != to_send {
441+
return Err(io::Error::other("libc::sendmmsg"));
449442
}
450-
}),
443+
}
444+
}
451445
}
446+
447+
Ok(())
452448
}
453449
}

0 commit comments

Comments
 (0)