From f0579ea701c2f6b03a5d7d0ca8c87efa846b4e5a Mon Sep 17 00:00:00 2001 From: Jeff Cantrill Date: Thu, 2 Jul 2026 15:37:10 -0400 Subject: [PATCH] feat(udp.send_error_max_duration_secs): add duration to recreate socket ref: LOG-9610 --- src/sinks/util/datagram.rs | 30 ++++++++++++++++++++++ src/sinks/util/udp.rs | 51 +++++++++++++++++++++++++++++++++++--- src/sinks/util/unix.rs | 1 + 3 files changed, 79 insertions(+), 3 deletions(-) diff --git a/src/sinks/util/datagram.rs b/src/sinks/util/datagram.rs index 9fe37f761477a..76d300f7f9aac 100644 --- a/src/sinks/util/datagram.rs +++ b/src/sinks/util/datagram.rs @@ -3,6 +3,7 @@ use futures::{StreamExt, stream::BoxStream}; use futures_util::stream::Peekable; #[cfg(unix)] use std::path::PathBuf; +use std::time::{Duration, Instant}; use tokio::net::UdpSocket; #[cfg(unix)] use tokio::net::UnixDatagram; @@ -35,7 +36,11 @@ pub async fn send_datagrams, bytes_sent: &::Handle, + send_error_max_duration: Option, ) { + // Track when errors started to enforce send_error_max_duration + let mut error_start: Option = None; + while let Some(mut event) = input.next().await { transformer.transform(&mut event); let finalizers = event.take_finalizers(); @@ -73,9 +78,34 @@ pub async fn send_datagrams= max_duration { + warn!( + message = "UDP send errors exceeded max duration threshold; breaking to recreate socket.", + error_duration_secs = error_duration.as_secs(), + max_duration_secs = max_duration.as_secs(), + ); + // Exit the loop to force socket recreation + return; + } + } } } } diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index 0b9441e682cde..834f30381ea7b 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -1,6 +1,7 @@ use std::{ net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, pin::Pin, + time::Duration, }; use async_trait::async_trait; @@ -40,6 +41,29 @@ pub enum UdpError { DnsError { source: crate::dns::DnsError }, } +/// UDP-specific options. +#[configurable_component] +#[derive(Clone, Debug, Default)] +pub struct UdpOptions { + /// Maximum duration to tolerate send errors before recreating the socket. + /// + /// When UDP send operations fail continuously for longer than this duration, + /// the socket will be closed and recreated to clear any cached kernel errors + /// (such as ICMP Port Unreachable). This is useful for recovering from + /// persistent connection failures where the destination becomes unavailable + /// and then recovers. + /// + /// The error timer is reset to zero whenever a send succeeds. + /// + /// If not set, the socket will remain open indefinitely regardless of errors, + /// which may prevent automatic recovery when destinations become available again. + #[configurable(metadata(docs::type_unit = "seconds"))] + #[configurable(metadata(docs::examples = 60))] + #[configurable(metadata(docs::examples = 300))] + #[serde(default)] + pub send_error_max_duration_secs: Option, +} + /// A UDP sink. #[configurable_component] #[derive(Clone, Debug)] @@ -59,13 +83,19 @@ pub struct UdpSinkConfig { #[configurable(metadata(docs::type_unit = "bytes"))] #[configurable(metadata(docs::examples = 65536))] send_buffer_bytes: Option, + + /// UDP-specific configuration options. + #[configurable(derived)] + #[serde(default)] + udp: UdpOptions, } impl UdpSinkConfig { - pub const fn from_address(address: String) -> Self { + pub fn from_address(address: String) -> Self { Self { address, send_buffer_bytes: None, + udp: UdpOptions::default(), } } @@ -73,7 +103,12 @@ impl UdpSinkConfig { let uri = self.address.parse::()?; let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string(); let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?; - Ok(UdpConnector::new(host, port, self.send_buffer_bytes)) + Ok(UdpConnector::new( + host, + port, + self.send_buffer_bytes, + self.udp.send_error_max_duration_secs.map(Duration::from_secs), + )) } pub fn build( @@ -100,14 +135,21 @@ struct UdpConnector { host: String, port: u16, send_buffer_bytes: Option, + send_error_max_duration: Option, } impl UdpConnector { - const fn new(host: String, port: u16, send_buffer_bytes: Option) -> Self { + const fn new( + host: String, + port: u16, + send_buffer_bytes: Option, + send_error_max_duration: Option, + ) -> Self { Self { host, port, send_buffer_bytes, + send_error_max_duration, } } @@ -202,6 +244,8 @@ where let mut encoder = self.encoder.clone(); let chunker = self.chunker.clone(); + let send_error_max_duration = self.connector.send_error_max_duration; + while Pin::new(&mut input).peek().await.is_some() { let socket = self.connector.connect_backoff().await; send_datagrams( @@ -211,6 +255,7 @@ where &mut encoder, &chunker, &self.bytes_sent, + send_error_max_duration, ) .await; } diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index cb9dadb934b3d..a48f3fbd43bf8 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -277,6 +277,7 @@ where &mut encoder, &None, &bytes_sent, + None, // Unix sockets don't use send_error_max_duration ) .await; }