Skip to content

Commit 72b713f

Browse files
committed
replace encoding/decoding spin locks with mutex+condvar
1 parent 6069019 commit 72b713f

6 files changed

Lines changed: 55 additions & 37 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,3 @@ enum-glob-use = "allow"
3535
missing-errors-doc = "allow"
3636
nursery = { level = "deny", priority = -1 }
3737
or_fun_call = "allow"
38-
significant_drop_tightening = "allow"

src/receive/decode.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
//! Worker that decodes `RaptorQ` packets into protocol blocks
22
33
use crate::{protocol, receive};
4-
use std::{sync, thread};
4+
use std::thread;
55

66
pub fn start<ClientNew, ClientEnd>(
77
receiver: &receive::Receiver<ClientNew, ClientEnd>,
88
) -> Result<(), receive::Error> {
99
loop {
1010
match receiver.for_decode.recv()? {
1111
super::Reassembled::Block { id, packets } => {
12+
log::debug!("received block {id} to decode");
13+
1214
match receiver.raptorq.decode(id, packets) {
1315
None => {
1416
log::error!("lost block {id} (failed to decode)");
@@ -17,23 +19,32 @@ pub fn start<ClientNew, ClientEnd>(
1719
Some(block) => {
1820
log::debug!("block {id} decoded with {} bytes!", block.len());
1921

20-
'inner: loop {
21-
let block_to_dispatch = receiver
22-
.block_to_dispatch
23-
.load(sync::atomic::Ordering::SeqCst);
24-
25-
if block_to_dispatch == id {
26-
receiver
27-
.to_dispatch
28-
.send(Some(protocol::Block::deserialize(block)))?;
29-
receiver
30-
.block_to_dispatch
31-
.fetch_add(1, sync::atomic::Ordering::SeqCst);
32-
break 'inner;
33-
}
34-
35-
thread::yield_now();
36-
}
22+
let mut block_to_dispatch =
23+
receiver.block_to_dispatch.0.lock().map_err(|e| {
24+
receive::Error::Other(format!(
25+
"failed to acquire block_to_dispatch mutex: {e}"
26+
))
27+
})?;
28+
29+
block_to_dispatch = receiver
30+
.block_to_dispatch
31+
.1
32+
.wait_while(block_to_dispatch, |block_to_dispatch| {
33+
*block_to_dispatch != id
34+
})
35+
.map_err(|e| {
36+
receive::Error::Other(format!(
37+
"failed to wait_while block_to_dispatch mutex: {e}"
38+
))
39+
})?;
40+
41+
receiver
42+
.to_dispatch
43+
.send(Some(protocol::Block::deserialize(block)))?;
44+
45+
*block_to_dispatch = block_to_dispatch.wrapping_add(1);
46+
drop(block_to_dispatch);
47+
receiver.block_to_dispatch.1.notify_all();
3748
}
3849
}
3950
}

src/receive/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub struct Receiver<ClientNew, ClientEnd> {
148148
config: Config,
149149
raptorq: protocol::RaptorQ,
150150
multiplex_control: semka::Sem,
151-
block_to_dispatch: sync::atomic::AtomicU8,
151+
block_to_dispatch: (sync::Mutex<u8>, sync::Condvar),
152152
to_reblock: crossbeam_channel::Sender<crate::udp::Datagrams>,
153153
for_reblock: crossbeam_channel::Receiver<crate::udp::Datagrams>,
154154
to_decode: crossbeam_channel::Sender<Reassembled>,
@@ -187,7 +187,7 @@ where
187187
let multiplex_control = semka::Sem::new(config.max_clients)
188188
.ok_or(Error::Other("failed to create semaphore".into()))?;
189189

190-
let block_to_dispatch = sync::atomic::AtomicU8::new(0);
190+
let block_to_dispatch = (sync::Mutex::new(0), sync::Condvar::new());
191191

192192
let (to_reblock, for_reblock) = crossbeam_channel::unbounded();
193193
let (to_decode, for_decode) = crossbeam_channel::unbounded();

src/receive/reblock.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! reordering
33
44
use crate::{receive, udp};
5-
use std::{mem, sync, thread};
5+
use std::{mem, thread};
66

77
pub const WINDOW_WIDTH: u8 = u8::MAX / 2;
88

@@ -63,9 +63,12 @@ pub fn start<ClientNew, ClientEnd>(
6363
let packet = raptorq::EncodingPacket::deserialize(first_datagram);
6464
cur_id = packet.payload_id().source_block_number();
6565

66-
receiver
67-
.block_to_dispatch
68-
.store(cur_id, sync::atomic::Ordering::SeqCst);
66+
let mut block_to_dispatch = receiver.block_to_dispatch.0.lock().map_err(|e| {
67+
receive::Error::Other(format!("failed to acquire block_to_dispatch mutex: {e}"))
68+
})?;
69+
*block_to_dispatch = cur_id;
70+
drop(block_to_dispatch);
71+
receiver.block_to_dispatch.1.notify_all();
6972

7073
let mut id = cur_id;
7174
let last = id.wrapping_add(WINDOW_WIDTH);

src/send/encoding.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! Worker that encodes protocol blocks into `RaptorQ` packets
22
33
use crate::send;
4-
use std::{sync::atomic, thread};
54

65
pub fn start<C>(sender: &send::Sender<C>) -> Result<(), send::Error> {
76
loop {
@@ -16,16 +15,22 @@ pub fn start<C>(sender: &send::Sender<C>) -> Result<(), send::Error> {
1615

1716
let packets = sender.raptorq.encode(block_id, block.serialized());
1817

19-
'inner: loop {
20-
let block_to_send = sender.block_to_send.load(atomic::Ordering::SeqCst);
18+
let mut block_to_send = sender.block_to_send.0.lock().map_err(|e| {
19+
send::Error::Other(format!("failed to acquire block_to_send mutex: {e}"))
20+
})?;
2121

22-
if block_to_send == block_id {
23-
sender.to_send.send(Some(packets))?;
24-
sender.block_to_send.fetch_add(1, atomic::Ordering::SeqCst);
25-
break 'inner;
26-
}
22+
block_to_send = sender
23+
.block_to_send
24+
.1
25+
.wait_while(block_to_send, |block_to_send| *block_to_send != block_id)
26+
.map_err(|e| {
27+
send::Error::Other(format!("failed to wait_while block_to_send mutex: {e}"))
28+
})?;
2729

28-
thread::yield_now();
29-
}
30+
sender.to_send.send(Some(packets))?;
31+
32+
*block_to_send = block_to_send.wrapping_add(1);
33+
drop(block_to_send);
34+
sender.block_to_send.1.notify_all();
3035
}
3136
}

src/send/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub struct Sender<C> {
117117
raptorq: protocol::RaptorQ,
118118
multiplex_control: semka::Sem,
119119
block_to_encode: sync::atomic::AtomicU8,
120-
block_to_send: sync::atomic::AtomicU8,
120+
block_to_send: (sync::Mutex<u8>, sync::Condvar),
121121
to_server: crossbeam_channel::Sender<Option<C>>,
122122
for_server: crossbeam_channel::Receiver<Option<C>>,
123123
to_encoding: crossbeam_channel::Sender<Option<(u8, protocol::Block)>>,
@@ -139,7 +139,7 @@ where
139139
.ok_or(Error::Other("failed to create semaphore".into()))?;
140140

141141
let block_to_encode = sync::atomic::AtomicU8::new(0);
142-
let block_to_send = sync::atomic::AtomicU8::new(0);
142+
let block_to_send = (sync::Mutex::new(0), sync::Condvar::new());
143143

144144
let (to_server, for_server) = crossbeam_channel::bounded(1);
145145
let (to_encoding, for_encoding) =

0 commit comments

Comments
 (0)