Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
/// Errors
/// - `IndexError::InvalidInput`: Either Key or Value is 0
/// - `IndexError::OutOfMemory`: Append after index file is already full.
pub fn append(&mut self, key: Key, value: u64) -> Result<(), IndexError> {
pub fn append(&mut self, key: Key, value: u64) -> Result<usize, IndexError> {
let key = key.into();
let last_key = self.last_key()?;
if last_key >= key {
Expand All @@ -179,7 +179,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
self.inner[start..start + KEY_SIZE].copy_from_slice(&key_bytes);
self.inner[start + KEY_SIZE..start + ENTRY_SIZE].copy_from_slice(&value_bytes);
self.num_entries += 1;
Ok(())
Ok(start)
}

/// Asynchronously flushes any pending changes to the index file
Expand All @@ -190,6 +190,21 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
self.inner.flush_async()
}

/// Asynchronously flushes the index entry starting at `offset` to the index file.
///
/// On linux, the underlying `msync` is a documented no-op since the kernel already
/// tracks dirty pages and flushes them as needed.
///
/// See https://man7.org/linux/man-pages/man2/msync.2.html for details.
///
/// On macOS, it is not a documented no-op, and it explicitly states that `msync`
/// will only examine pages covered by the provided address range. Hence this should
/// be preferred over [`Self::async_flush`] when only flushing a single entry at a
/// time, since it may avoid examining pages across the whole mapping.
pub fn async_flush_entry(&self, offset: usize) -> io::Result<()> {
self.inner.flush_async_range(offset, ENTRY_SIZE)
}

/// Truncates the index file starting from the entry with a key greater than
/// or equal to the given key.
///
Expand Down
9 changes: 3 additions & 6 deletions crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,10 @@ impl OffsetIndexWriter {
return Ok(());
}

self.head
let entry_offset = self
.head
.append(self.candidate_min_tx_offset, self.candidate_byte_offset)?;
self.head.async_flush()?;
self.head.async_flush_entry(entry_offset)?;
self.reset();

Ok(())
Expand All @@ -371,10 +372,6 @@ impl FileLike for OffsetIndexWriter {
let _ = self.append_internal().map_err(|e| {
warn!("failed to append to offset index: {e:?}");
});
let _ = self
.head
.async_flush()
.map_err(|e| warn!("failed to flush offset index: {e:?}"));
Comment on lines 372 to -377
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush isn't needed since append_internal already flushes.

Ok(())
}

Expand Down
4 changes: 1 addition & 3 deletions crates/durability/src/imp/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,7 @@ where
let ready_len = tx_buf.len();
self.queue_depth.fetch_sub(ready_len as u64, Relaxed);
tx_buf = spawn_blocking(move || -> io::Result<Vec<PreparedTx<Txdata<T>>>> {
for tx in tx_buf.drain(..) {
clog.commit([tx.into_transaction()])?;
}
clog.commit(tx_buf.drain(..).map(|tx| tx.into_transaction()))?;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could include quite a lot of transactions, up to the size of the durability queue size. Not sure if this should be bounded further or if it matters at all.

Ok(tx_buf)
})
.await
Expand Down
Loading