diff --git a/crates/commitlog/src/index/indexfile.rs b/crates/commitlog/src/index/indexfile.rs index 080fac97472..338c3e4552a 100644 --- a/crates/commitlog/src/index/indexfile.rs +++ b/crates/commitlog/src/index/indexfile.rs @@ -161,7 +161,7 @@ impl + From> IndexFileMut { /// Errors /// - `IndexError::InvalidInput`: Either Key or Value is 0 /// - `IndexError::OutOfMemory`: Append after index file is already full. - pub fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> { + pub fn append(&mut self, key: Key, value: u64) -> Result { let key = key.into(); let last_key = self.last_key()?; if last_key >= key { @@ -179,7 +179,7 @@ impl + From> IndexFileMut { self.inner[start..start + KEY_SIZE].copy_from_slice(&key_bytes); self.inner[start + KEY_SIZE..start + ENTRY_SIZE].copy_from_slice(&value_bytes); self.num_entries += 1; - Ok(()) + Ok(start) } /// Asynchronously flushes any pending changes to the index file @@ -190,6 +190,21 @@ impl + From> IndexFileMut { self.inner.flush_async() } + /// Asynchronously flushes the index entry starting at `offset` to the index file. + /// + /// On linux, the underlying `msync` is a documented no-op since the kernel already + /// tracks dirty pages and flushes them as needed. + /// + /// See https://man7.org/linux/man-pages/man2/msync.2.html for details. + /// + /// On macOS, it is not a documented no-op, and it explicitly states that `msync` + /// will only examine pages covered by the provided address range. Hence this should + /// be preferred over [`Self::async_flush`] when only flushing a single entry at a + /// time, since it may avoid examining pages across the whole mapping. + pub fn async_flush_entry(&self, offset: usize) -> io::Result<()> { + self.inner.flush_async_range(offset, ENTRY_SIZE) + } + /// Truncates the index file starting from the entry with a key greater than /// or equal to the given key. /// diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index abc8729c978..0f0753f0f36 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -109,7 +109,7 @@ impl Default for Options { impl Options { pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024; pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); - pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; + pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = true; pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 8 * 1024; diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7913a3799e5..d4f85bf8fa0 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -356,9 +356,10 @@ impl OffsetIndexWriter { return Ok(()); } - self.head + let entry_offset = self + .head .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?; - self.head.async_flush()?; + self.head.async_flush_entry(entry_offset)?; self.reset(); Ok(()) @@ -371,10 +372,6 @@ impl FileLike for OffsetIndexWriter { let _ = self.append_internal().map_err(|e| { warn!("failed to append to offset index: {e:?}"); }); - let _ = self - .head - .async_flush() - .map_err(|e| warn!("failed to flush offset index: {e:?}")); Ok(()) }