Skip to content

Commit e44e471

Browse files
committed
utils: add Windows eventfd implementation backed by manual-reset Event
Emulate eventfd on Windows using a manual-reset kernel Event object paired with a Mutex-protected counter. To maximize VMM throughput, the `write` path only trigger the event when the counter transitions from `0 -> non-zero`. If a virtual device rapid-fires multiple interrupts before the vCPU wakes up, we accumulate the data in user-space RAM and skip the redundant kernel syscalls entirely. `read` and `wait_timeout` maintain strict level-triggered synchronization. The kernel event is only reset (`ResetEvent`) when the internal counter is fully drained, preventing the IOCP epoll loop from entering an infinite busy-wait cycle. Signed-off-by: lstocchi <lstocchi@redhat.com>
1 parent a102a77 commit e44e471

3 files changed

Lines changed: 273 additions & 0 deletions

File tree

src/utils/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ pub use macos::eventfd;
2020
pub mod windows;
2121
#[cfg(target_os = "windows")]
2222
pub use windows::epoll;
23+
#[cfg(target_os = "windows")]
24+
pub use windows::eventfd;
2325
pub mod pollable_channel;
2426
#[cfg(target_arch = "x86_64")]
2527
pub mod rand;

src/utils/src/windows/eventfd.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
//! Structure and wrapper functions emulating eventfd using a Windows manual-reset Event object.
2+
3+
use std::sync::{Arc, Mutex};
4+
use std::{io, result};
5+
6+
use windows_sys::Win32::Foundation::{CloseHandle, HANDLE, WAIT_OBJECT_0};
7+
use windows_sys::Win32::System::Threading::{
8+
CreateEventW, ResetEvent, SetEvent, WaitForSingleObject, INFINITE,
9+
};
10+
11+
use super::{AsRawFd, RawFd};
12+
13+
pub const EFD_NONBLOCK: i32 = 1;
14+
pub const EFD_SEMAPHORE: i32 = 2;
15+
16+
#[derive(Debug)]
17+
struct Inner {
18+
event: HANDLE,
19+
counter: Mutex<u64>,
20+
nonblock: bool,
21+
semaphore: bool,
22+
}
23+
24+
// The HANDLE is a Windows kernel object usable from any thread.
25+
unsafe impl Send for Inner {}
26+
unsafe impl Sync for Inner {}
27+
28+
impl Drop for Inner {
29+
fn drop(&mut self) {
30+
unsafe {
31+
CloseHandle(self.event);
32+
}
33+
}
34+
}
35+
36+
#[derive(Clone, Debug)]
37+
pub struct EventFd {
38+
inner: Arc<Inner>,
39+
}
40+
41+
impl EventFd {
42+
pub fn new(flag: i32) -> result::Result<EventFd, io::Error> {
43+
let event = unsafe {
44+
CreateEventW(
45+
std::ptr::null(),
46+
1, // bManualReset = TRUE
47+
0, // bInitialState = FALSE (non-signaled)
48+
std::ptr::null(),
49+
)
50+
};
51+
if event.is_null() {
52+
return Err(io::Error::last_os_error());
53+
}
54+
55+
Ok(EventFd {
56+
inner: Arc::new(Inner {
57+
event,
58+
counter: Mutex::new(0),
59+
nonblock: (flag & EFD_NONBLOCK) != 0,
60+
semaphore: (flag & EFD_SEMAPHORE) != 0,
61+
}),
62+
})
63+
}
64+
65+
pub fn write(&self, v: u64) -> result::Result<(), io::Error> {
66+
let mut counter = self.inner.counter.lock().unwrap();
67+
68+
let was_zero = *counter == 0;
69+
*counter = counter.saturating_add(v);
70+
71+
// Only signal the event if it was not already signaled.
72+
if was_zero {
73+
if unsafe { SetEvent(self.inner.event) } == 0 {
74+
return Err(io::Error::last_os_error());
75+
}
76+
}
77+
Ok(())
78+
}
79+
80+
pub fn read(&self) -> result::Result<u64, io::Error> {
81+
loop {
82+
{
83+
let mut counter = self.inner.counter.lock().unwrap();
84+
if *counter > 0 {
85+
let result = if self.inner.semaphore {
86+
// Semaphore mode: Decrement by 1
87+
*counter -= 1;
88+
1
89+
} else {
90+
// Standard mode: Drain the whole counter
91+
let val = *counter;
92+
*counter = 0;
93+
val
94+
};
95+
96+
if *counter == 0 {
97+
unsafe {
98+
ResetEvent(self.inner.event);
99+
}
100+
}
101+
return Ok(result);
102+
}
103+
if self.inner.nonblock {
104+
return Err(io::ErrorKind::WouldBlock.into());
105+
}
106+
} // Lock is dropped here before blocking so writers can make progress!
107+
108+
let ret = unsafe { WaitForSingleObject(self.inner.event, INFINITE) };
109+
if ret != WAIT_OBJECT_0 {
110+
return Err(io::Error::last_os_error());
111+
}
112+
}
113+
}
114+
115+
pub fn try_clone(&self) -> result::Result<EventFd, io::Error> {
116+
Ok(EventFd {
117+
inner: Arc::clone(&self.inner),
118+
})
119+
}
120+
121+
/// Waits up to `ms` milliseconds for the event to be signaled.
122+
///
123+
/// Returns `true` if the event was signaled, `false` on timeout.
124+
/// On signal, consumes one unit (semaphore mode) or drains the counter
125+
/// (standard mode). The kernel event is only reset when the counter
126+
/// reaches zero.
127+
pub fn wait_timeout(&self, ms: u32) -> bool {
128+
let result = unsafe { WaitForSingleObject(self.inner.event, ms) };
129+
if result == WAIT_OBJECT_0 {
130+
let mut counter = self.inner.counter.lock().unwrap();
131+
if *counter > 0 {
132+
if self.inner.semaphore {
133+
*counter -= 1;
134+
} else {
135+
*counter = 0;
136+
}
137+
if *counter == 0 {
138+
unsafe {
139+
ResetEvent(self.inner.event);
140+
}
141+
}
142+
}
143+
true
144+
} else {
145+
false
146+
}
147+
}
148+
}
149+
150+
impl AsRawFd for EventFd {
151+
fn as_raw_fd(&self) -> RawFd {
152+
self.inner.event
153+
}
154+
}
155+
156+
#[cfg(test)]
157+
mod tests {
158+
use super::*;
159+
160+
#[test]
161+
fn test_read_write() {
162+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
163+
evt.write(55).unwrap();
164+
assert_eq!(evt.read().unwrap(), 55);
165+
}
166+
167+
#[test]
168+
fn test_read_nothing_nonblock() {
169+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
170+
let res = evt.read();
171+
assert!(matches!(res, Err(err) if err.kind() == io::ErrorKind::WouldBlock));
172+
}
173+
174+
#[test]
175+
fn test_multiple_writes_accumulate() {
176+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
177+
evt.write(3).unwrap();
178+
evt.write(5).unwrap();
179+
assert_eq!(evt.read().unwrap(), 8);
180+
}
181+
182+
/// After read() drains the counter to 0, the kernel event must be
183+
/// unsignaled. If ResetEvent is missing, wait_timeout(0) would
184+
/// return true forever — the "infinite wakeup" bug.
185+
#[test]
186+
fn test_event_reset_after_read() {
187+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
188+
evt.write(1).unwrap();
189+
assert_eq!(evt.read().unwrap(), 1);
190+
assert!(
191+
!evt.wait_timeout(0),
192+
"kernel event should be unsignaled after drain"
193+
);
194+
}
195+
196+
/// Verify that writing after a full drain re-signals the event.
197+
#[test]
198+
fn test_write_read_cycle() {
199+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
200+
201+
evt.write(10).unwrap();
202+
assert_eq!(evt.read().unwrap(), 10);
203+
assert!(!evt.wait_timeout(0));
204+
205+
evt.write(20).unwrap();
206+
assert_eq!(evt.read().unwrap(), 20);
207+
assert!(!evt.wait_timeout(0));
208+
}
209+
210+
#[test]
211+
fn test_semaphore_mode() {
212+
let evt = EventFd::new(EFD_NONBLOCK | EFD_SEMAPHORE).unwrap();
213+
evt.write(3).unwrap();
214+
215+
assert_eq!(evt.read().unwrap(), 1);
216+
assert_eq!(evt.read().unwrap(), 1);
217+
assert_eq!(evt.read().unwrap(), 1);
218+
219+
let res = evt.read();
220+
assert!(matches!(res, Err(err) if err.kind() == io::ErrorKind::WouldBlock));
221+
}
222+
223+
/// In semaphore mode, the kernel event must stay signaled as long as
224+
/// the counter is > 0, and only unsignal on the final decrement.
225+
#[test]
226+
fn test_semaphore_event_stays_signaled() {
227+
let evt = EventFd::new(EFD_NONBLOCK | EFD_SEMAPHORE).unwrap();
228+
evt.write(3).unwrap();
229+
230+
assert_eq!(evt.read().unwrap(), 1); // counter: 3 -> 2
231+
assert!(
232+
evt.wait_timeout(0),
233+
"event should still be signaled with counter=2"
234+
);
235+
236+
// wait_timeout consumed one (counter: 2 -> 1)
237+
assert!(
238+
evt.wait_timeout(0),
239+
"event should still be signaled with counter=1"
240+
);
241+
242+
// wait_timeout consumed one (counter: 1 -> 0, ResetEvent)
243+
assert!(
244+
!evt.wait_timeout(0),
245+
"event should be unsignaled after full drain"
246+
);
247+
}
248+
249+
#[test]
250+
fn test_wait_timeout_not_signaled() {
251+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
252+
assert!(!evt.wait_timeout(0));
253+
}
254+
255+
#[test]
256+
fn test_wait_timeout_signaled() {
257+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
258+
evt.write(42).unwrap();
259+
assert!(evt.wait_timeout(0));
260+
}
261+
262+
#[test]
263+
fn test_clone() {
264+
let evt = EventFd::new(EFD_NONBLOCK).unwrap();
265+
let evt_clone = evt.try_clone().unwrap();
266+
267+
evt.write(923).unwrap();
268+
assert_eq!(evt_clone.read().unwrap(), 923);
269+
}
270+
}

src/utils/src/windows/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use windows_sys::Win32::Foundation::HANDLE;
22

33
pub(crate) mod bindings;
44
pub mod epoll;
5+
pub mod eventfd;
56

67
/// Cross-platform alias used by the rest of the codebase. On Windows this
78
/// is just [`HANDLE`] — the two names are interchangeable.

0 commit comments

Comments
 (0)