Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/block_read_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ pub trait BlockWrite {
fn num_bytes_remaining_in_block(&self) -> usize;
}

#[cfg(test)]
pub struct ArrayReader<'a> {
block: [u8; BLOCK_NUM_BYTES],
data: &'a [u8],
}

#[cfg(test)]
impl<'a> From<&'a [u8]> for ArrayReader<'a> {
fn from(data: &'a [u8]) -> Self {
assert!(data.len() >= BLOCK_NUM_BYTES);
Expand All @@ -48,6 +50,7 @@ impl<'a> From<&'a [u8]> for ArrayReader<'a> {
}
}

#[cfg(test)]
impl BlockRead for ArrayReader<'_> {
fn next_block(&mut self) -> io::Result<bool> {
if self.data.len() < BLOCK_NUM_BYTES {
Expand Down
15 changes: 11 additions & 4 deletions src/frame/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,28 @@ impl<W: BlockWrite + Unpin> FrameWriter<W> {
}

/// Writes a frame. The payload has to be lower than the
/// remaining space in the frame as defined
/// by `max_writable_frame_length`.
pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<()> {
/// remaining space in the frame as defined by `max_writable_frame_length`.
///
/// Returns the number of bytes pushed to the underlying
/// writer (header + payload, plus any zero-padding written to close out the current block).
pub fn write_frame(&mut self, frame_type: FrameType, payload: &[u8]) -> io::Result<usize> {
let mut num_bytes_written = 0;
let num_bytes_remaining_in_block = self.wrt.num_bytes_remaining_in_block();

if num_bytes_remaining_in_block < HEADER_LEN {
let zero_bytes = [0u8; HEADER_LEN];
self.wrt
.write(&zero_bytes[..num_bytes_remaining_in_block])?;
num_bytes_written += num_bytes_remaining_in_block;
}
let record_len = HEADER_LEN + payload.len();
let (buffer_header, buffer_record) = self.buffer[..record_len].split_at_mut(HEADER_LEN);
buffer_record.copy_from_slice(payload);
Header::for_payload(frame_type, payload).serialize(buffer_header);
self.wrt.write(&self.buffer[..record_len])?;
Ok(())

num_bytes_written += record_len;
Ok(num_bytes_written)
}

/// Flush the buffered writer used in the FrameWriter.
Expand Down
30 changes: 30 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,36 @@ pub struct ResourceUsage {
pub disk_used_bytes: usize,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AppendOutcome {
/// Position of the last record appended, or `None` for an idempotent no-op
/// (empty payloads, or `position_opt` already past the queue's head).
pub last_position: Option<u64>,
/// Bytes appended to the WAL: frame headers + payload + any end-of-block padding
/// written ahead of the record. Two identical calls may report different values
/// depending on where the write cursor sat within the current block (a frame crossing
/// a block boundary incurs padding). `0` for an idempotent no-op.
pub wal_bytes_written: u64,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TruncateOutcome {
/// Number of records evicted from the in-memory queue by this call. This is a delta:
/// a truncate at a position already covered by a previous truncate reports `0`.
pub evicted_records: usize,
pub wal_bytes_written: u64,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DeleteQueueOutcome {
pub wal_bytes_written: u64,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CreateQueueOutcome {
pub wal_bytes_written: u64,
}

#[cfg(test)]
mod tests;

Expand Down
92 changes: 58 additions & 34 deletions src/multi_record_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::mem::{MemQueue, QueuesSummary};
use crate::record::{MultiPlexedRecord, MultiRecord};
use crate::recordlog::RecordWriter;
use crate::rolling::RollingWriter;
use crate::{mem, PersistAction, PersistPolicy, PersistState, Record, ResourceUsage};
use crate::{
mem, AppendOutcome, CreateQueueOutcome, DeleteQueueOutcome, PersistAction, PersistPolicy,
PersistState, Record, ResourceUsage, TruncateOutcome,
};

pub struct MultiRecordLog {
record_log_writer: crate::recordlog::RecordWriter<RollingWriter>,
Expand Down Expand Up @@ -99,7 +102,8 @@ impl MultiRecordLog {
next_persist: persist_policy.into(),
multi_record_spare_buffer: Vec::new(),
};
multi_record_log.run_gc_if_necessary()?;
// Bytes written by recovery-time GC are not surfaced to any user-facing API.
let _ = multi_record_log.run_gc_if_necessary()?;
Ok(multi_record_log)
}

Expand All @@ -112,27 +116,31 @@ impl MultiRecordLog {
/// Creates a new queue.
///
/// Returns an error if the queue already exists.
pub fn create_queue(&mut self, queue: &str) -> Result<(), CreateQueueError> {
pub fn create_queue(&mut self, queue: &str) -> Result<CreateQueueOutcome, CreateQueueError> {
info!(queue = queue, "create queue");
if self.queue_exists(queue) {
return Err(CreateQueueError::AlreadyExists);
}
let record = MultiPlexedRecord::RecordPosition { queue, position: 0 };
self.record_log_writer.write_record(record)?;
let num_bytes_written = self.record_log_writer.write_record(record)?;
self.persist(PersistAction::FlushAndFsync)?;
self.in_mem_queues.create_queue(queue)?;
Ok(())
Ok(CreateQueueOutcome {
wal_bytes_written: num_bytes_written,
})
}

pub fn delete_queue(&mut self, queue: &str) -> Result<(), DeleteQueueError> {
pub fn delete_queue(&mut self, queue: &str) -> Result<DeleteQueueOutcome, DeleteQueueError> {
info!(queue = queue, "delete queue");
let position = self.in_mem_queues.next_position(queue)?;
let record = MultiPlexedRecord::DeleteQueue { queue, position };
self.record_log_writer.write_record(record)?;
let mut num_bytes_written = self.record_log_writer.write_record(record)?;
self.in_mem_queues.delete_queue(queue)?;
self.run_gc_if_necessary()?;
num_bytes_written += self.run_gc_if_necessary()?;
self.persist(PersistAction::FlushAndFsync)?;
Ok(())
Ok(DeleteQueueOutcome {
wal_bytes_written: num_bytes_written,
})
}

pub fn queue_exists(&self, queue: &str) -> bool {
Expand All @@ -153,7 +161,7 @@ impl MultiRecordLog {
queue: &str,
position_opt: Option<u64>,
payload: impl Buf,
) -> Result<Option<u64>, AppendError> {
) -> Result<AppendOutcome, AppendError> {
self.append_records(queue, position_opt, std::iter::once(payload))
}

Expand All @@ -168,12 +176,15 @@ impl MultiRecordLog {
queue: &str,
position_opt: Option<u64>,
payloads: T,
) -> Result<Option<u64>, AppendError> {
) -> Result<AppendOutcome, AppendError> {
let next_position = self.in_mem_queues.next_position(queue)?;
if let Some(position) = position_opt {
// we accept position in the future, and move forward as required.
if position + 1 == next_position {
return Ok(None);
return Ok(AppendOutcome {
last_position: None,
wal_bytes_written: 0,
});
} else if position < next_position {
return Err(AppendError::Past);
}
Expand All @@ -186,7 +197,10 @@ impl MultiRecordLog {
if multi_record_spare_buffer.is_empty() {
self.multi_record_spare_buffer = multi_record_spare_buffer;
// empty transaction: don't persist it
return Ok(None);
return Ok(AppendOutcome {
last_position: None,
wal_bytes_written: 0,
});
}

let records = MultiRecord::new_unchecked(&multi_record_spare_buffer);
Expand All @@ -195,7 +209,7 @@ impl MultiRecordLog {
queue,
records,
};
self.record_log_writer.write_record(record)?;
let num_bytes_written = self.record_log_writer.write_record(record)?;
self.persist_on_policy()?;

let mem_queue = self.in_mem_queues.get_queue_mut(queue)?;
Expand All @@ -208,59 +222,69 @@ impl MultiRecordLog {
}

self.multi_record_spare_buffer = multi_record_spare_buffer;
Ok(Some(max_position))
Ok(AppendOutcome {
last_position: Some(max_position),
wal_bytes_written: num_bytes_written,
})
}

fn record_empty_queues_position(&mut self) -> io::Result<()> {
let mut has_empty_queues = false;
fn record_empty_queues_position(&mut self) -> io::Result<u64> {
let mut num_bytes_written: u64 = 0;

for (queue_id, queue) in self.in_mem_queues.empty_queues() {
let next_position = queue.next_position();
let record = MultiPlexedRecord::RecordPosition {
queue: queue_id,
position: next_position,
};
self.record_log_writer.write_record(record)?;
has_empty_queues = true
num_bytes_written += self.record_log_writer.write_record(record)?;
}
if has_empty_queues {
if num_bytes_written > 0 {
// We need to fsync here! We are remove files from the FS
// so we need to make sure our empty queue positions are properly persisted.
self.persist(PersistAction::FlushAndFsync)?;
}
Ok(())
Ok(num_bytes_written)
}

/// Truncates the queue up to a given `position`, included. This method immediately
/// truncates the underlying in-memory queue whereas the backing log files are deleted
/// asynchronously when they become exclusively composed of deleted records.
///
/// This method will always truncate the record log and release the associated memory.
/// It returns the number of records deleted.
pub fn truncate(
&mut self,
queue: &str,
truncate_range: RangeToInclusive<u64>,
) -> Result<usize, TruncateError> {
) -> Result<TruncateOutcome, TruncateError> {
info!(range=?truncate_range, queue = queue, "truncate queue");
if !self.queue_exists(queue) {
return Err(TruncateError::MissingQueue(queue.to_string()));
}
self.record_log_writer
.write_record(MultiPlexedRecord::Truncate {
truncate_range,
queue,
})?;
let removed_count = self
let mut num_bytes_written =
self.record_log_writer
.write_record(MultiPlexedRecord::Truncate {
truncate_range,
queue,
})?;
let evicted_records = self
.in_mem_queues
.truncate(queue, truncate_range)
.unwrap_or(0);
self.run_gc_if_necessary()?;
num_bytes_written += self.run_gc_if_necessary()?;
self.persist_on_policy()?;
Ok(removed_count)
Ok(TruncateOutcome {
evicted_records,
wal_bytes_written: num_bytes_written,
})
}

fn run_gc_if_necessary(&mut self) -> io::Result<()> {
/// Returns the number of bytes the GC pass appended to the WAL — empty-queue position
/// records, if any. Returns 0 when there's no GC work to do.
fn run_gc_if_necessary(&mut self) -> io::Result<u64> {
debug!("run_gc_if_necessary");
let mut num_bytes_written = 0;

if self
.record_log_writer
.directory()
Expand All @@ -273,7 +297,7 @@ impl MultiRecordLog {
// But first we clone the current file number to make sure that the file that will
// contain the truncate positions it self won't be GC'ed.
let _file_number = self.record_log_writer.current_file().clone();
self.record_empty_queues_position()?;
num_bytes_written += self.record_empty_queues_position()?;
self.record_log_writer.directory().gc()?;
}
// only execute the following if we are above the debug level in tokio tracing
Expand All @@ -285,7 +309,7 @@ impl MultiRecordLog {
debug!(first_pos=?first_pos, last_pos=?last_pos, "queue positions after gc");
}
}
Ok(())
Ok(num_bytes_written)
}

pub fn range<R>(
Expand Down
21 changes: 15 additions & 6 deletions src/proptests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ impl PropTestEnv {
.record_log
.append_records(queue, Some(new_pos), std::iter::once(&b"BB"[..]))
.unwrap()
.last_position
.unwrap();

assert!(self
.record_log
.append_records(queue, Some(new_pos), std::iter::once(&b"BB"[..]))
.unwrap()
.last_position
.is_none());

assert_eq!(new_pos, res);
Expand All @@ -93,7 +95,7 @@ impl PropTestEnv {
let state = self.state.get_mut(queue).unwrap();

let new_pos = state.0.end + skip_one_pos as u64;
let res = self
let outcome = self
.record_log
.append_records(
queue,
Expand All @@ -103,7 +105,7 @@ impl PropTestEnv {
.unwrap();

if count != 0 {
let res = res.unwrap();
let res = outcome.last_position.unwrap();
assert_eq!(new_pos + count - 1, res);
state.0.end = new_pos + count;
state.1 += count;
Expand All @@ -114,7 +116,11 @@ impl PropTestEnv {
let state = self.state.get_mut(queue).unwrap();
if state.0.contains(&pos) {
state.0.start = pos + 1;
state.1 -= self.record_log.truncate(queue, ..=pos).unwrap() as u64;
state.1 -= self
.record_log
.truncate(queue, ..=pos)
.unwrap()
.evicted_records as u64;
} else if pos >= state.0.end {
// advance the queue to the position.
state.0 = (pos + 1)..(pos + 1);
Expand Down Expand Up @@ -342,7 +348,8 @@ fn test_multi_record() {
assert_eq!(
multi_record_log
.append_record("queue", None, &b"1"[..])
.unwrap(),
.unwrap()
.last_position,
Some(0)
);
}
Expand All @@ -351,7 +358,8 @@ fn test_multi_record() {
assert_eq!(
multi_record_log
.append_record("queue", None, &b"22"[..])
.unwrap(),
.unwrap()
.last_position,
Some(1)
);
}
Expand All @@ -374,7 +382,8 @@ fn test_multi_record() {
assert_eq!(
multi_record_log
.append_record("queue", None, &b"hello"[..])
.unwrap(),
.unwrap()
.last_position,
Some(2)
);
}
Expand Down
Loading
Loading