Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/sinks/util/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use futures::{StreamExt, stream::BoxStream};
use futures_util::stream::Peekable;
#[cfg(unix)]
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
#[cfg(unix)]
use tokio::net::UnixDatagram;
Expand Down Expand Up @@ -35,7 +36,11 @@ pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encodi
encoder: &mut E,
chunker: &Option<Chunker>,
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
send_error_max_duration: Option<Duration>,
) {
// Track when errors started to enforce send_error_max_duration
let mut error_start: Option<Instant> = None;

while let Some(mut event) = input.next().await {
transformer.transform(&mut event);
let finalizers = event.take_finalizers();
Expand Down Expand Up @@ -73,9 +78,34 @@ pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encodi
};

if delivered {
// Success: reset error tracking
error_start = None;
finalizers.update_status(EventStatus::Delivered);
} else {
// Error occurred
finalizers.update_status(EventStatus::Errored);

// Check if we should recreate socket due to error duration
if let Some(max_duration) = send_error_max_duration {
let now = Instant::now();
let error_duration = if let Some(start) = error_start {
now.duration_since(start)
} else {
// First error - start tracking
error_start = Some(now);
Duration::from_secs(0)
};

if error_duration >= max_duration {
warn!(
message = "UDP send errors exceeded max duration threshold; breaking to recreate socket.",
error_duration_secs = error_duration.as_secs(),
max_duration_secs = max_duration.as_secs(),
);
// Exit the loop to force socket recreation
return;
}
}
}
}
}
Expand Down
51 changes: 48 additions & 3 deletions src/sinks/util/udp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
pin::Pin,
time::Duration,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -40,6 +41,29 @@ pub enum UdpError {
DnsError { source: crate::dns::DnsError },
}

/// UDP-specific options.
#[configurable_component]
#[derive(Clone, Debug, Default)]
pub struct UdpOptions {
/// Maximum duration to tolerate send errors before recreating the socket.
///
/// When UDP send operations fail continuously for longer than this duration,
/// the socket will be closed and recreated to clear any cached kernel errors
/// (such as ICMP Port Unreachable). This is useful for recovering from
/// persistent connection failures where the destination becomes unavailable
/// and then recovers.
///
/// The error timer is reset to zero whenever a send succeeds.
///
/// If not set, the socket will remain open indefinitely regardless of errors,
/// which may prevent automatic recovery when destinations become available again.
#[configurable(metadata(docs::type_unit = "seconds"))]
#[configurable(metadata(docs::examples = 60))]
#[configurable(metadata(docs::examples = 300))]
#[serde(default)]
pub send_error_max_duration_secs: Option<u64>,
}

/// A UDP sink.
#[configurable_component]
#[derive(Clone, Debug)]
Expand All @@ -59,21 +83,32 @@ pub struct UdpSinkConfig {
#[configurable(metadata(docs::type_unit = "bytes"))]
#[configurable(metadata(docs::examples = 65536))]
send_buffer_bytes: Option<usize>,

/// UDP-specific configuration options.
#[configurable(derived)]
#[serde(default)]
udp: UdpOptions,
}

impl UdpSinkConfig {
pub const fn from_address(address: String) -> Self {
pub fn from_address(address: String) -> Self {
Self {
address,
send_buffer_bytes: None,
udp: UdpOptions::default(),
}
}

fn build_connector(&self) -> crate::Result<UdpConnector> {
let uri = self.address.parse::<http::Uri>()?;
let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
Ok(UdpConnector::new(host, port, self.send_buffer_bytes))
Ok(UdpConnector::new(
host,
port,
self.send_buffer_bytes,
self.udp.send_error_max_duration_secs.map(Duration::from_secs),
))
}

pub fn build(
Expand All @@ -100,14 +135,21 @@ struct UdpConnector {
host: String,
port: u16,
send_buffer_bytes: Option<usize>,
send_error_max_duration: Option<Duration>,
}

impl UdpConnector {
const fn new(host: String, port: u16, send_buffer_bytes: Option<usize>) -> Self {
const fn new(
host: String,
port: u16,
send_buffer_bytes: Option<usize>,
send_error_max_duration: Option<Duration>,
) -> Self {
Self {
host,
port,
send_buffer_bytes,
send_error_max_duration,
}
}

Expand Down Expand Up @@ -202,6 +244,8 @@ where

let mut encoder = self.encoder.clone();
let chunker = self.chunker.clone();
let send_error_max_duration = self.connector.send_error_max_duration;

while Pin::new(&mut input).peek().await.is_some() {
let socket = self.connector.connect_backoff().await;
send_datagrams(
Expand All @@ -211,6 +255,7 @@ where
&mut encoder,
&chunker,
&self.bytes_sent,
send_error_max_duration,
)
.await;
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/util/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ where
&mut encoder,
&None,
&bytes_sent,
None, // Unix sockets don't use send_error_max_duration
)
.await;
}
Expand Down