From 94e7048e51d65ca3e6db11507e634324af5f5e44 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Wed, 13 May 2026 17:56:17 -0700 Subject: [PATCH] Batch multiple transactions in a single commit in the commitlog Also pass a range to msync when writing entries to the segment offset index file, to be explicit and avoid flushing/examining unnecessary pages. --- crates/commitlog/src/index/indexfile.rs | 19 +++++++++++++++++-- crates/commitlog/src/segment.rs | 9 +++------ crates/durability/src/imp/local.rs | 4 +--- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/crates/commitlog/src/index/indexfile.rs b/crates/commitlog/src/index/indexfile.rs index 080fac97472..338c3e4552a 100644 --- a/crates/commitlog/src/index/indexfile.rs +++ b/crates/commitlog/src/index/indexfile.rs @@ -161,7 +161,7 @@ impl + From> IndexFileMut { /// Errors /// - `IndexError::InvalidInput`: Either Key or Value is 0 /// - `IndexError::OutOfMemory`: Append after index file is already full. - pub fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> { + pub fn append(&mut self, key: Key, value: u64) -> Result { let key = key.into(); let last_key = self.last_key()?; if last_key >= key { @@ -179,7 +179,7 @@ impl + From> IndexFileMut { self.inner[start..start + KEY_SIZE].copy_from_slice(&key_bytes); self.inner[start + KEY_SIZE..start + ENTRY_SIZE].copy_from_slice(&value_bytes); self.num_entries += 1; - Ok(()) + Ok(start) } /// Asynchronously flushes any pending changes to the index file @@ -190,6 +190,21 @@ impl + From> IndexFileMut { self.inner.flush_async() } + /// Asynchronously flushes the index entry starting at `offset` to the index file. + /// + /// On linux, the underlying `msync` is a documented no-op since the kernel already + /// tracks dirty pages and flushes them as needed. + /// + /// See https://man7.org/linux/man-pages/man2/msync.2.html for details. + /// + /// On macOS, it is not a documented no-op, and it explicitly states that `msync` + /// will only examine pages covered by the provided address range. Hence this should + /// be preferred over [`Self::async_flush`] when only flushing a single entry at a + /// time, since it may avoid examining pages across the whole mapping. + pub fn async_flush_entry(&self, offset: usize) -> io::Result<()> { + self.inner.flush_async_range(offset, ENTRY_SIZE) + } + /// Truncates the index file starting from the entry with a key greater than /// or equal to the given key. /// diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 7913a3799e5..d4f85bf8fa0 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -356,9 +356,10 @@ impl OffsetIndexWriter { return Ok(()); } - self.head + let entry_offset = self + .head .append(self.candidate_min_tx_offset, self.candidate_byte_offset)?; - self.head.async_flush()?; + self.head.async_flush_entry(entry_offset)?; self.reset(); Ok(()) @@ -371,10 +372,6 @@ impl FileLike for OffsetIndexWriter { let _ = self.append_internal().map_err(|e| { warn!("failed to append to offset index: {e:?}"); }); - let _ = self - .head - .async_flush() - .map_err(|e| warn!("failed to flush offset index: {e:?}")); Ok(()) } diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 04d46d8f634..f140cc2ce31 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -282,9 +282,7 @@ where let ready_len = tx_buf.len(); self.queue_depth.fetch_sub(ready_len as u64, Relaxed); tx_buf = spawn_blocking(move || -> io::Result>>> { - for tx in tx_buf.drain(..) { - clog.commit([tx.into_transaction()])?; - } + clog.commit(tx_buf.drain(..).map(|tx| tx.into_transaction()))?; Ok(tx_buf) }) .await