Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/uu/wc/src/count_fast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn count_bytes_using_splice(fd: &impl AsFd) -> Result<usize, usize> {
}
} else {
// input is not pipe. needs broker to use splice() with additional cost
let (pipe_rd, pipe_wr) = pipe().map_err(|_| 0_usize)?;
let (pipe_rd, pipe_wr) = pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).map_err(|_| 0_usize)?;
loop {
match splice(fd, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(byte_count),
Expand Down
4 changes: 2 additions & 2 deletions src/uu/yes/src/yes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ pub fn exec(mut bytes: Vec<u8>) -> io::Result<()> {
// improve throughput
let _ = rustix::pipe::fcntl_setpipe_size(&stdout, MAX_ROOTLESS_PIPE_SIZE);
// don't show any error from fast-path and fallback to write for proper message
if let Ok((p_read, mut p_write)) = pipe()
if let Ok((p_read, mut p_write)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE)
&& p_write.write_all(bytes).is_ok()
{
if aligned && tee(&p_read, &stdout, MAX_ROOTLESS_PIPE_SIZE).is_ok() {
while let Ok(1..) = tee(&p_read, &stdout, MAX_ROOTLESS_PIPE_SIZE) {}
} else if let Ok((broker_read, broker_write)) = pipe() {
} else if let Ok((broker_read, broker_write)) = pipe::<true>(MAX_ROOTLESS_PIPE_SIZE) {
// tee() cannot control offset and write to non-pipe
'hybrid: while let Ok(mut remain) = tee(&p_read, &broker_write, MAX_ROOTLESS_PIPE_SIZE)
{
Expand Down
11 changes: 6 additions & 5 deletions src/uucore/src/lib/features/buf_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ mod tests {

#[cfg(target_os = "linux")]
use {
crate::pipes,
std::fs::OpenOptions,
std::{
io::{Seek, SeekFrom},
Expand All @@ -50,11 +49,13 @@ mod tests {
}

#[test]
#[cfg(target_os = "linux")]
#[cfg(unix)]
fn test_copy_stream() {
let mut dest_file = new_temp_file();

let (mut pipe_read, mut pipe_write) = pipes::pipe().unwrap();
let (pipe_read, pipe_write) = rustix::pipe::pipe().unwrap();
let mut pipe_read: File = pipe_read.into();
let mut pipe_write: File = pipe_write.into();
let data = b"Hello, world!";
let thread = thread::spawn(move || {
pipe_write.write_all(data).unwrap();
Expand All @@ -72,8 +73,8 @@ mod tests {
}

#[test]
#[cfg(not(target_os = "linux"))]
// Test for non-linux platforms. We use regular files instead.
#[cfg(not(unix))]
// Test for non-unix platforms. We use regular files instead.
fn test_copy_stream() {
let temp_dir = tempdir().unwrap();
let src_path = temp_dir.path().join("src.txt");
Expand Down
34 changes: 14 additions & 20 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,21 @@ pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
#[cfg(any(target_os = "linux", target_os = "android"))]
const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;

/// A wrapper around [`rustix::pipe::pipe`] that ensures the pipe is cleaned up.
/// return pipe larger than given size
/// SIZE_REQUIRED should be true if you want to fail when changing pipe size failed
/// e.g. writing size to pipe should not hang
///
/// Returns two `File` objects: everything written to the second can be read
/// from the first.
/// used for resolving the limitation for splice: one of a input or output should be pipe
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn pipe() -> std::io::Result<(File, File)> {
let (read, write) = rustix::pipe::pipe()?;
// improve performance for splice
let _ = fcntl_setpipe_size(&read, MAX_ROOTLESS_PIPE_SIZE);

Ok((File::from(read), File::from(write)))
}

/// return pipe larger than given size and kernel's default size
///
/// useful to save RAM usage
#[inline]
#[cfg(any(target_os = "linux", target_os = "android"))]
fn pipe_with_size(s: usize) -> std::io::Result<(File, File)> {
pub fn pipe<const SIZE_REQUIRED: bool>(s: usize) -> std::io::Result<(File, File)> {
Comment thread
oech3 marked this conversation as resolved.
let (read, write) = rustix::pipe::pipe()?;
// guard unnecessary syscall
if s > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&read, s);
let r = fcntl_setpipe_size(&read, s);
if SIZE_REQUIRED {
r?;
}
}

Ok((File::from(read), File::from(write)))
Expand Down Expand Up @@ -126,7 +117,10 @@ where
S: AsFd,
{
static PIPE_CACHE: OnceLock<Option<(File, File)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe().ok()).as_ref() else {
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE
.get_or_init(|| pipe::<false>(MAX_ROOTLESS_PIPE_SIZE).ok())
.as_ref()
else {
return Ok(true);
};
// improve throughput
Expand Down Expand Up @@ -197,7 +191,7 @@ pub fn send_n_bytes(
}
}
} else if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| pipe_with_size(pipe_size).ok())
.get_or_init(|| pipe::<false>(pipe_size).ok())
.as_ref()
{
// todo: create fn splice_bounded_broker
Expand Down
7 changes: 4 additions & 3 deletions tests/by-util/test_comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,14 @@ fn test_output_lossy_utf8() {
#[cfg_attr(wasi_runner, ignore = "WASI sandbox: host paths not visible")]
fn test_comm_anonymous_pipes() {
use std::{io::Write, os::fd::AsRawFd, process};
use uucore::pipes::pipe;

let scene = TestScenario::new(util_name!());

// Open two anonymous pipes
let (comm1_reader, mut comm1_writer) = pipe().unwrap();
let (comm2_reader, mut comm2_writer) = pipe().unwrap();
let (comm1_reader, comm1_writer) = rustix::pipe::pipe().unwrap();
let mut comm1_writer: std::fs::File = comm1_writer.into();
let (comm2_reader, comm2_writer) = rustix::pipe::pipe().unwrap();
let mut comm2_writer: std::fs::File = comm2_writer.into();

// comm reads the data in chunks
// make content large enough, so that at least two chunks are read
Expand Down
Loading