diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 0d4e81f2..80e42e27 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -21,6 +21,7 @@ const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled"; const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled"; const GLOBAL_INDEX_ENABLED_OPTION: &str = "global-index.enabled"; const GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION: &str = "global-index.row-count-per-shard"; +const GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION: &str = "global-index.column-update-action"; const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size"; const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost"; const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name"; @@ -46,6 +47,10 @@ const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format"; const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression"; const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode"; const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; +const MANIFEST_COMPRESSION_OPTION: &str = "manifest.compression"; +const MANIFEST_TARGET_FILE_SIZE_OPTION: &str = "manifest.target-file-size"; +const MANIFEST_TARGET_SIZE_OPTION: &str = "manifest.target-size"; +const MANIFEST_MERGE_MIN_COUNT_OPTION: &str = "manifest.merge-min-count"; const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size"; pub(crate) const SEQUENCE_FIELD_OPTION: &str = "sequence.field"; pub(crate) const DISABLE_EXPLICIT_TYPE_CASTING_OPTION: &str = "disable-explicit-type-casting"; @@ -62,6 +67,9 @@ pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis"; pub const SCAN_VERSION_OPTION: &str = "scan.version"; const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024; const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; +const DEFAULT_MANIFEST_COMPRESSION: &str = "zstd"; +const DEFAULT_MANIFEST_TARGET_FILE_SIZE: i64 = 8 * 1024 * 1024; +const DEFAULT_MANIFEST_MERGE_MIN_COUNT: usize = 30; const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-"; const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024; @@ -102,6 +110,13 @@ pub enum ChangelogProducer { Lookup, } +/// Action when a partial-column update touches globally indexed columns. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum GlobalIndexColumnUpdateAction { + ThrowError, + DropPartitionIndex, +} + /// Bucket function used to map bucket keys to fixed bucket ids. /// /// Reference: Java `CoreOptions.BucketFunctionType`. @@ -263,6 +278,24 @@ impl<'a> CoreOptions<'a> { Ok(value) } + pub fn global_index_column_update_action( + &self, + ) -> crate::Result { + match self + .options + .get(GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION) + .map(|v| v.to_ascii_uppercase()) + .as_deref() + .unwrap_or("THROW_ERROR") + { + "THROW_ERROR" => Ok(GlobalIndexColumnUpdateAction::ThrowError), + "DROP_PARTITION_INDEX" => Ok(GlobalIndexColumnUpdateAction::DropPartitionIndex), + other => Err(crate::Error::ConfigInvalid { + message: format!("Unsupported global-index.column-update-action: {other}"), + }), + } + } + pub fn source_split_target_size(&self) -> i64 { self.options .get(SOURCE_SPLIT_TARGET_SIZE_OPTION) @@ -400,6 +433,29 @@ impl<'a> CoreOptions<'a> { .unwrap_or(false) } + /// Suggested target size for a manifest file. Default is 8 MiB. + /// + /// `manifest.target-file-size` is the Java/Python option. The shorter + /// `manifest.target-size` alias is accepted for older Rust callers that + /// used the name from early parity discussions. + pub fn manifest_target_size(&self) -> i64 { + self.options + .get(MANIFEST_TARGET_FILE_SIZE_OPTION) + .or_else(|| self.options.get(MANIFEST_TARGET_SIZE_OPTION)) + .and_then(|v| parse_memory_size(v)) + .unwrap_or(DEFAULT_MANIFEST_TARGET_FILE_SIZE) + } + + /// Minimum number of small manifest files required before minor manifest + /// compaction rewrites them into a new rolling manifest set. + pub fn manifest_merge_min_count(&self) -> usize { + self.options + .get(MANIFEST_MERGE_MIN_COUNT_OPTION) + .and_then(|v| v.parse().ok()) + .filter(|v| *v > 0) + .unwrap_or(DEFAULT_MANIFEST_MERGE_MIN_COUNT) + } + /// Number of buckets for the table. Default is 1. pub fn bucket(&self) -> i32 { self.options @@ -505,6 +561,15 @@ impl<'a> CoreOptions<'a> { .map(String::as_str) } + /// Avro compression codec for manifest, manifest-list and index-manifest files. + /// Default is `"zstd"`, matching Java Paimon `CoreOptions.MANIFEST_COMPRESSION`. + pub fn manifest_compression(&self) -> &str { + self.options + .get(MANIFEST_COMPRESSION_OPTION) + .map(String::as_str) + .unwrap_or(DEFAULT_MANIFEST_COMPRESSION) + } + /// Parquet writer in-progress buffer size limit. Default is 256MB. /// When the buffered data exceeds this, the writer flushes the current row group. pub fn write_parquet_buffer_size(&self) -> i64 { @@ -588,6 +653,10 @@ mod tests { core_options.global_index_row_count_per_shard().unwrap(), 100_000 ); + assert_eq!( + core_options.global_index_column_update_action().unwrap(), + GlobalIndexColumnUpdateAction::ThrowError + ); } #[test] @@ -605,6 +674,10 @@ mod tests { GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION.to_string(), "2048".to_string(), ), + ( + GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION.to_string(), + "DROP_PARTITION_INDEX".to_string(), + ), ]); let core_options = CoreOptions::new(&options); @@ -614,6 +687,10 @@ mod tests { core_options.global_index_row_count_per_shard().unwrap(), 2048 ); + assert_eq!( + core_options.global_index_column_update_action().unwrap(), + GlobalIndexColumnUpdateAction::DropPartitionIndex + ); } #[test] @@ -815,6 +892,9 @@ mod tests { assert_eq!(core.commit_min_retry_wait_ms(), 1_000); assert_eq!(core.commit_max_retry_wait_ms(), 10_000); assert!(!core.row_tracking_enabled()); + assert_eq!(core.manifest_compression(), "zstd"); + assert_eq!(core.manifest_target_size(), 8 * 1024 * 1024); + assert_eq!(core.manifest_merge_min_count(), 30); } #[test] @@ -826,6 +906,12 @@ mod tests { (COMMIT_MIN_RETRY_WAIT_OPTION.to_string(), "500".to_string()), (COMMIT_MAX_RETRY_WAIT_OPTION.to_string(), "5000".to_string()), (ROW_TRACKING_ENABLED_OPTION.to_string(), "true".to_string()), + ( + MANIFEST_TARGET_FILE_SIZE_OPTION.to_string(), + "1kb".to_string(), + ), + (MANIFEST_COMPRESSION_OPTION.to_string(), "null".to_string()), + (MANIFEST_MERGE_MIN_COUNT_OPTION.to_string(), "3".to_string()), ]); let core = CoreOptions::new(&options); assert_eq!(core.bucket(), 4); @@ -834,6 +920,17 @@ mod tests { assert_eq!(core.commit_min_retry_wait_ms(), 500); assert_eq!(core.commit_max_retry_wait_ms(), 5_000); assert!(core.row_tracking_enabled()); + assert_eq!(core.manifest_compression(), "null"); + assert_eq!(core.manifest_target_size(), 1024); + assert_eq!(core.manifest_merge_min_count(), 3); + } + + #[test] + fn test_manifest_target_size_accepts_compat_alias() { + let options = HashMap::from([(MANIFEST_TARGET_SIZE_OPTION.to_string(), "2kb".into())]); + let core = CoreOptions::new(&options); + + assert_eq!(core.manifest_target_size(), 2 * 1024); } #[test] diff --git a/crates/paimon/src/spec/index_manifest.rs b/crates/paimon/src/spec/index_manifest.rs index cdafc522..b8ee2b52 100644 --- a/crates/paimon/src/spec/index_manifest.rs +++ b/crates/paimon/src/spec/index_manifest.rs @@ -141,7 +141,27 @@ impl IndexManifest { /// Write index manifest entries to a file. pub async fn write(file_io: &FileIO, path: &str, entries: &[IndexManifestEntry]) -> Result<()> { - let bytes = crate::spec::to_avro_bytes(INDEX_MANIFEST_ENTRY_SCHEMA, entries)?; + Self::write_with_compression( + file_io, + path, + entries, + crate::spec::DEFAULT_AVRO_COMPRESSION, + ) + .await + } + + /// Write index manifest entries with the configured Avro compression. + pub async fn write_with_compression( + file_io: &FileIO, + path: &str, + entries: &[IndexManifestEntry], + compression: &str, + ) -> Result<()> { + let bytes = crate::spec::to_avro_bytes_with_compression( + INDEX_MANIFEST_ENTRY_SCHEMA, + entries, + compression, + )?; let output = file_io.new_output(path)?; output.write(bytes::Bytes::from(bytes)).await } diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index 80463e41..c25dba9a 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -62,7 +62,27 @@ impl Manifest { /// Write manifest entries to a file. pub async fn write(file_io: &FileIO, path: &str, entries: &[ManifestEntry]) -> Result<()> { - let bytes = crate::spec::to_avro_bytes(MANIFEST_ENTRY_SCHEMA, entries)?; + Self::write_with_compression( + file_io, + path, + entries, + crate::spec::DEFAULT_AVRO_COMPRESSION, + ) + .await + } + + /// Write manifest entries to a file with the configured Avro compression. + pub async fn write_with_compression( + file_io: &FileIO, + path: &str, + entries: &[ManifestEntry], + compression: &str, + ) -> Result<()> { + let bytes = crate::spec::to_avro_bytes_with_compression( + MANIFEST_ENTRY_SCHEMA, + entries, + compression, + )?; let output = file_io.new_output(path)?; output.write(bytes::Bytes::from(bytes)).await } diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index 51bb81cf..540f4409 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -205,6 +205,15 @@ impl ManifestFileMeta { self } + /// Attach row-id range statistics aggregated from manifest entries. + #[inline] + #[must_use] + pub fn with_row_id_stats(mut self, min_row_id: Option, max_row_id: Option) -> Self { + self.min_row_id = min_row_id; + self.max_row_id = max_row_id; + self + } + #[inline] pub fn new( file_name: String, diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs index cf3985c8..6c59c684 100644 --- a/crates/paimon/src/spec/manifest_list.rs +++ b/crates/paimon/src/spec/manifest_list.rs @@ -38,7 +38,22 @@ impl ManifestList { /// Write manifest file metas to a manifest list file. pub async fn write(file_io: &FileIO, path: &str, metas: &[ManifestFileMeta]) -> Result<()> { - let bytes = crate::spec::to_avro_bytes(MANIFEST_FILE_META_SCHEMA, metas)?; + Self::write_with_compression(file_io, path, metas, crate::spec::DEFAULT_AVRO_COMPRESSION) + .await + } + + /// Write manifest file metas with the configured Avro compression. + pub async fn write_with_compression( + file_io: &FileIO, + path: &str, + metas: &[ManifestFileMeta], + compression: &str, + ) -> Result<()> { + let bytes = crate::spec::to_avro_bytes_with_compression( + MANIFEST_FILE_META_SCHEMA, + metas, + compression, + )?; let output = file_io.new_output(path)?; output.write(bytes::Bytes::from(bytes)).await } diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 88fec22f..765bb767 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -68,11 +68,16 @@ pub use manifest_common::FileKind; mod manifest_entry; pub use manifest_entry::Identifier; pub use manifest_entry::ManifestEntry; +pub(crate) use manifest_entry::MANIFEST_ENTRY_SCHEMA; mod manifest_list; pub use manifest_list::ManifestList; mod objects_file; pub use objects_file::from_avro_bytes; pub use objects_file::to_avro_bytes; +pub(crate) use objects_file::{ + new_avro_writer, to_avro_bytes_with_compression, DEFAULT_AVRO_BLOCK_SIZE, + DEFAULT_AVRO_COMPRESSION, +}; pub(crate) mod avro; pub(crate) mod stats; mod types; diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index 81e43779..2b16f55c 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. -use apache_avro::{from_value, to_value, Codec, Reader, Schema, Writer}; +use apache_avro::{from_value, to_value, Codec, Reader, Schema, Writer, ZstandardSettings}; use serde::de::DeserializeOwned; use serde::Serialize; +pub(crate) const DEFAULT_AVRO_BLOCK_SIZE: usize = 16_000; +pub(crate) const DEFAULT_AVRO_COMPRESSION: &str = "zstd"; + pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { Reader::new(bytes)? .map(|r| { @@ -33,15 +36,47 @@ pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result(schema_json: &str, records: &[T]) -> crate::Result> { + to_avro_bytes_with_compression(schema_json, records, DEFAULT_AVRO_COMPRESSION) +} + +pub(crate) fn to_avro_bytes_with_compression( + schema_json: &str, + records: &[T], + compression: &str, +) -> crate::Result> { let schema = Schema::parse_str(schema_json)?; - let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null); + let mut writer = new_avro_writer(&schema, compression, DEFAULT_AVRO_BLOCK_SIZE)?; for record in records { - let value = to_value(record).and_then(|v| v.resolve(&schema))?; + let value = to_value(record).and_then(|value| value.resolve(&schema))?; writer.append(value)?; } Ok(writer.into_inner()?) } +pub(crate) fn new_avro_writer<'a>( + schema: &'a Schema, + compression: &str, + block_size: usize, +) -> crate::Result>> { + Ok(Writer::builder() + .schema(schema) + .writer(Vec::new()) + .codec(avro_codec(compression)?) + .block_size(block_size.max(1)) + .build()) +} + +pub(crate) fn avro_codec(compression: &str) -> crate::Result { + match compression.to_ascii_lowercase().as_str() { + "zstd" | "zstandard" => Ok(Codec::Zstandard(ZstandardSettings::default())), + "null" | "none" | "uncompressed" => Ok(Codec::Null), + "snappy" => Ok(Codec::Snappy), + other => Err(crate::Error::Unsupported { + message: format!("Unsupported Avro compression: {other}"), + }), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 5ea930d3..652d58a1 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -64,6 +64,10 @@ pub struct Snapshot { #[builder(default = None)] #[serde(skip_serializing_if = "Option::is_none")] changelog_manifest_list: Option, + /// byte size of changelog manifest list file + #[builder(default = None)] + #[serde(default, skip_serializing_if = "Option::is_none")] + changelog_manifest_list_size: Option, /// a manifest recording all index files of this table #[builder(default = None)] #[serde(skip_serializing_if = "Option::is_none")] @@ -150,6 +154,12 @@ impl Snapshot { self.changelog_manifest_list.as_deref() } + /// Get the byte size of changelog manifest list of this snapshot. + #[inline] + pub fn changelog_manifest_list_size(&self) -> Option { + self.changelog_manifest_list_size + } + /// Get the index manifest of this snapshot. #[inline] pub fn index_manifest(&self) -> Option<&str> { diff --git a/crates/paimon/src/table/commit_message.rs b/crates/paimon/src/table/commit_message.rs index 88d35332..5a91e36b 100644 --- a/crates/paimon/src/table/commit_message.rs +++ b/crates/paimon/src/table/commit_message.rs @@ -29,10 +29,14 @@ pub struct CommitMessage { pub bucket: i32, /// New data files to be added. pub new_files: Vec, + /// Snapshot id from which row-id/column conflicts should be checked. + pub check_from_snapshot: Option, /// New changelog files to be added. pub new_changelog_files: Vec, /// New index files to be added (used by dynamic bucket mode). pub new_index_files: Vec, + /// Index files to be removed from the current index manifest. + pub deleted_index_files: Vec, /// Files to be deleted (copy-on-write rewrite: old files replaced by new_files). pub deleted_files: Vec, } @@ -43,8 +47,10 @@ impl CommitMessage { partition, bucket, new_files, + check_from_snapshot: None, new_changelog_files: Vec::new(), new_index_files: Vec::new(), + deleted_index_files: Vec::new(), deleted_files: Vec::new(), } } diff --git a/crates/paimon/src/table/data_evolution_writer.rs b/crates/paimon/src/table/data_evolution_writer.rs index 2a2862d0..3e123166 100644 --- a/crates/paimon/src/table/data_evolution_writer.rs +++ b/crates/paimon/src/table/data_evolution_writer.rs @@ -358,6 +358,7 @@ impl DataEvolutionWriter { file_range.partition.clone(), file_range.bucket, first_row_id, + file_range.snapshot_id, updated_batch, ) .await?; @@ -504,6 +505,7 @@ struct MatchedRow { /// Key: (partition_bytes, bucket, first_row_id) type WriterKey = (Vec, i32, i64); +type PartialCommitGroup = (Option, Vec); /// Writer for data evolution partial-column files. /// @@ -529,6 +531,7 @@ pub(crate) struct DataEvolutionPartialWriter { write_columns: Vec, /// Writers keyed by (partition_bytes, bucket, first_row_id). writers: HashMap, + check_from_snapshots: HashMap, } impl DataEvolutionPartialWriter { @@ -582,6 +585,7 @@ impl DataEvolutionPartialWriter { write_fields, write_columns, writers: HashMap::new(), + check_from_snapshots: HashMap::new(), }) } @@ -594,6 +598,7 @@ impl DataEvolutionPartialWriter { partition_bytes: Vec, bucket: i32, first_row_id: i64, + check_from_snapshot: i64, batch: RecordBatch, ) -> Result<()> { if batch.num_rows() == 0 { @@ -601,6 +606,10 @@ impl DataEvolutionPartialWriter { } let key = (partition_bytes.clone(), bucket, first_row_id); + self.check_from_snapshots + .entry(key.clone()) + .and_modify(|snapshot| *snapshot = (*snapshot).min(check_from_snapshot)) + .or_insert(check_from_snapshot); if !self.writers.contains_key(&key) { let partition_path = if self.partition_keys.is_empty() { String::new() @@ -635,32 +644,42 @@ impl DataEvolutionPartialWriter { /// Close all writers and collect CommitMessages for use with TableCommit. pub async fn prepare_commit(&mut self) -> Result> { let writers: Vec<(WriterKey, DataFileWriter)> = self.writers.drain().collect(); + let mut check_from_snapshots = std::mem::take(&mut self.check_from_snapshots); let futures: Vec<_> = writers .into_iter() - .map( - |((partition_bytes, bucket, _first_row_id), mut writer)| async move { + .map(|(key, mut writer)| { + let check_from_snapshot = check_from_snapshots.remove(&key); + async move { let files = writer.prepare_commit().await?; - Ok::<_, crate::Error>((partition_bytes, bucket, files)) - }, - ) + let (partition_bytes, bucket, _first_row_id) = key; + Ok::<_, crate::Error>((partition_bytes, bucket, check_from_snapshot, files)) + } + }) .collect(); let results = futures::future::try_join_all(futures).await?; // Group files by (partition, bucket) since multiple first_row_ids may share the same partition/bucket - let mut grouped: HashMap<(Vec, i32), Vec> = HashMap::new(); - for (partition_bytes, bucket, files) in results { - grouped + let mut grouped: HashMap<(Vec, i32), PartialCommitGroup> = HashMap::new(); + for (partition_bytes, bucket, check_from_snapshot, files) in results { + let entry = grouped .entry((partition_bytes, bucket)) - .or_default() - .extend(files); + .or_insert_with(|| (None, Vec::new())); + if let Some(check_from_snapshot) = check_from_snapshot { + entry.0 = Some(entry.0.map_or(check_from_snapshot, |snapshot| { + snapshot.min(check_from_snapshot) + })); + } + entry.1.extend(files); } let mut messages = Vec::new(); - for ((partition_bytes, bucket), files) in grouped { + for ((partition_bytes, bucket), (check_from_snapshot, files)) in grouped { if !files.is_empty() { - messages.push(CommitMessage::new(partition_bytes, bucket, files)); + let mut message = CommitMessage::new(partition_bytes, bucket, files); + message.check_from_snapshot = check_from_snapshot; + messages.push(message); } } Ok(messages) @@ -803,13 +822,14 @@ mod tests { let batch = make_partial_batch(vec!["alice", "bob", "charlie"]); writer - .write_partial_batch(vec![], 0, 0, batch) + .write_partial_batch(vec![], 0, 0, 7, batch) .await .unwrap(); let messages = writer.prepare_commit().await.unwrap(); assert_eq!(messages.len(), 1); assert_eq!(messages[0].new_files.len(), 1); + assert_eq!(messages[0].check_from_snapshot, Some(7)); let meta = &messages[0].new_files[0]; assert_eq!(meta.row_count, 3); @@ -830,19 +850,20 @@ mod tests { // Two batches with different first_row_id should produce two files let batch1 = make_partial_batch(vec!["alice", "bob"]); writer - .write_partial_batch(vec![], 0, 0, batch1) + .write_partial_batch(vec![], 0, 0, 9, batch1) .await .unwrap(); let batch2 = make_partial_batch(vec!["charlie"]); writer - .write_partial_batch(vec![], 0, 100, batch2) + .write_partial_batch(vec![], 0, 100, 8, batch2) .await .unwrap(); let messages = writer.prepare_commit().await.unwrap(); assert_eq!(messages.len(), 1); assert_eq!(messages[0].new_files.len(), 2); + assert_eq!(messages[0].check_from_snapshot, Some(8)); let mut files = messages[0].new_files.clone(); files.sort_by_key(|f| f.first_row_id); diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs index 938a0239..055dcd58 100644 --- a/crates/paimon/src/table/table_commit.rs +++ b/crates/paimon/src/table/table_commit.rs @@ -24,21 +24,29 @@ use crate::io::FileIO; use crate::spec::stats::BinaryTableStats; use crate::spec::FileKind; use crate::spec::{ - datums_to_binary_row, extract_datum, BinaryRow, BinaryRowBuilder, CommitKind, CoreOptions, - DataType, Datum, IndexManifest, IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, - ManifestList, PartitionStatistics, Snapshot, + bucket_dir_name, extract_datum, merge_active_entries, BinaryRow, BinaryRowBuilder, CommitKind, + CoreOptions, DataFileMeta, DataType, Datum, GlobalIndexColumnUpdateAction, IndexManifest, + IndexManifestEntry, Manifest, ManifestEntry, ManifestFileMeta, ManifestList, PartitionComputer, + PartitionStatistics, Predicate, Snapshot, EMPTY_SERIALIZED_ROW, MANIFEST_ENTRY_SCHEMA, }; use crate::table::commit_message::CommitMessage; use crate::table::partition_filter::PartitionFilter; use crate::table::snapshot_commit::SnapshotCommit; use crate::table::{SnapshotManager, Table, TableScan}; use crate::Result; +use apache_avro::{to_value, Schema}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; /// Batch commit identifier (i64::MAX), same as Python's BATCH_COMMIT_IDENTIFIER. const BATCH_COMMIT_IDENTIFIER: i64 = i64::MAX; +/// Java RollingFileWriter.CHECK_ROLLING_RECORD_CNT. +const CHECK_ROLLING_RECORD_COUNT: usize = 1000; + +type PartitionBucketKey = (Vec, i32); +type RowIdRange = (i64, i64); +type ExistingRowIdRanges = HashMap>; /// Table commit logic for Paimon write operations. /// @@ -54,7 +62,11 @@ pub struct TableCommit { commit_timeout_ms: u64, commit_min_retry_wait_ms: u64, commit_max_retry_wait_ms: u64, + manifest_compression: String, + manifest_target_size: i64, + manifest_merge_min_count: usize, row_tracking_enabled: bool, + data_evolution_enabled: bool, partition_default_name: String, } @@ -74,7 +86,11 @@ impl TableCommit { let commit_timeout_ms = core_options.commit_timeout_ms(); let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms(); let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms(); + let manifest_compression = core_options.manifest_compression().to_string(); + let manifest_target_size = core_options.manifest_target_size(); + let manifest_merge_min_count = core_options.manifest_merge_min_count(); let row_tracking_enabled = core_options.row_tracking_enabled(); + let data_evolution_enabled = core_options.data_evolution_enabled(); let partition_default_name = core_options.partition_default_name().to_string(); Self { table, @@ -86,13 +102,30 @@ impl TableCommit { commit_timeout_ms, commit_min_retry_wait_ms, commit_max_retry_wait_ms, + manifest_compression, + manifest_target_size, + manifest_merge_min_count, row_tracking_enabled, + data_evolution_enabled, partition_default_name, } } /// Commit new files in APPEND mode. pub async fn commit(&self, commit_messages: Vec) -> Result<()> { + self.commit_with_identifier(commit_messages, BATCH_COMMIT_IDENTIFIER) + .await + } + + /// Commit new files with a caller-provided commit identifier. + /// + /// The identifier participates in retry idempotency, matching Python + /// `FileStoreCommit.commit(commit_messages, commit_identifier)`. + pub async fn commit_with_identifier( + &self, + commit_messages: Vec, + commit_identifier: i64, + ) -> Result<()> { if commit_messages.is_empty() { return Ok(()); } @@ -100,13 +133,16 @@ impl TableCommit { let entries = self.messages_to_entries(&commit_messages); let changelog_entries = self.messages_to_changelog_entries(&commit_messages); let new_index_entries = self.messages_to_index_entries(&commit_messages); + let check_from_snapshot = Self::min_check_from_snapshot(&commit_messages); self.try_commit( CommitEntriesPlan::Direct { entries, changelog_entries, new_index_entries, + check_from_snapshot, }, None, + commit_identifier, ) .await } @@ -115,6 +151,20 @@ impl TableCommit { &self, commit_messages: Vec, expected_snapshot_id: i64, + ) -> Result<()> { + self.commit_if_latest_snapshot_with_identifier( + commit_messages, + expected_snapshot_id, + BATCH_COMMIT_IDENTIFIER, + ) + .await + } + + pub(crate) async fn commit_if_latest_snapshot_with_identifier( + &self, + commit_messages: Vec, + expected_snapshot_id: i64, + commit_identifier: i64, ) -> Result<()> { if commit_messages.is_empty() { return Ok(()); @@ -123,13 +173,16 @@ impl TableCommit { let entries = self.messages_to_entries(&commit_messages); let changelog_entries = self.messages_to_changelog_entries(&commit_messages); let new_index_entries = self.messages_to_index_entries(&commit_messages); + let check_from_snapshot = Self::min_check_from_snapshot(&commit_messages); self.try_commit( CommitEntriesPlan::Direct { entries, changelog_entries, new_index_entries, + check_from_snapshot, }, Some(expected_snapshot_id), + commit_identifier, ) .await } @@ -150,6 +203,17 @@ impl TableCommit { &self, commit_messages: Vec, static_partitions: Option>>, + ) -> Result<()> { + self.overwrite_with_identifier(commit_messages, static_partitions, BATCH_COMMIT_IDENTIFIER) + .await + } + + /// Overwrite partitions with a caller-provided commit identifier. + pub async fn overwrite_with_identifier( + &self, + commit_messages: Vec, + static_partitions: Option>>, + commit_identifier: i64, ) -> Result<()> { if commit_messages.is_empty() && static_partitions.is_none() { return Ok(()); @@ -160,34 +224,36 @@ impl TableCommit { let has_new_data_entries = new_entries .iter() .any(|entry| *entry.kind() == FileKind::Add); + let has_static_partitions = static_partitions.is_some(); let partition_filter = if let Some(sp) = static_partitions { - let partition_keys = self.table.schema().partition_keys(); + self.validate_partition_spec_keys(&sp)?; let partition_fields = self.table.schema().partition_fields(); - let is_full_spec = partition_keys.iter().all(|k| sp.contains_key(k)); - - if is_full_spec { - let bytes = self.partitions_to_bytes(&[sp]); - Some(PartitionFilter::from_partition_set( - bytes, - &partition_fields, - )?) - } else { - Some(self.build_static_partition_predicate(&sp, &partition_fields)?) - } + Some(self.build_static_partition_predicate(&sp, &partition_fields)?) } else if !self.table.schema().partition_fields().is_empty() && !has_new_data_entries { return Ok(()); } else { self.build_dynamic_partition_filter(&new_entries)? }; + if has_static_partitions { + if let Some(filter) = partition_filter.as_ref() { + self.validate_static_overwrite_entries(filter, &new_entries)?; + } + } + self.try_commit( CommitEntriesPlan::Overwrite { partition_filter, new_entries, new_index_entries, + cached_snapshot: None, + cached_entries: Vec::new(), + full_scan_count: 0, + delta_probe_count: 0, }, None, + commit_identifier, ) .await } @@ -199,22 +265,14 @@ impl TableCommit { partition_fields: &[crate::spec::DataField], ) -> Result { use crate::spec::PredicateBuilder; - let pb = PredicateBuilder::new(partition_fields); - let mut predicates = Vec::new(); - for (key, value) in static_partitions { - // Currently all values from parse_static_partitions are Some; - // None would represent an explicit NULL partition value. - let pred = match value { - Some(datum) => pb.equal(key, datum.clone())?, - None => pb.is_null(key)?, - }; - predicates.push(pred); + if static_partitions.is_empty() { + return Ok(PartitionFilter::from_predicate( + Predicate::AlwaysTrue, + partition_fields, + )); } - let combined = if predicates.len() == 1 { - predicates.into_iter().next().unwrap() - } else { - crate::spec::Predicate::and(predicates) - }; + let pb = PredicateBuilder::new(partition_fields); + let combined = self.partition_spec_predicate(&pb, static_partitions)?; Ok(PartitionFilter::from_predicate(combined, partition_fields)) } @@ -268,84 +326,275 @@ impl TableCommit { )?)) } + fn build_partition_filter_from_specs( + &self, + partitions: &[HashMap>], + ) -> Result { + let partition_fields = self.table.schema().partition_fields(); + if partition_fields.is_empty() { + return Err(crate::Error::DataInvalid { + message: "Cannot drop partitions from an unpartitioned table.".to_string(), + source: None, + }); + } + + use crate::spec::PredicateBuilder; + let pb = PredicateBuilder::new(&partition_fields); + let mut predicates = Vec::new(); + for partition in partitions { + self.validate_partition_spec_keys(partition)?; + if !partition.is_empty() { + predicates.push(self.partition_spec_predicate(&pb, partition)?); + } + } + + if predicates.is_empty() { + return Err(crate::Error::DataInvalid { + message: "Failed to build partition filter for drop_partitions.".to_string(), + source: None, + }); + } + + Ok(PartitionFilter::from_predicate( + Predicate::or(predicates), + &partition_fields, + )) + } + + fn partition_spec_predicate( + &self, + pb: &crate::spec::PredicateBuilder, + partition: &HashMap>, + ) -> Result { + let predicates = partition + .iter() + .map(|(key, value)| self.partition_value_predicate(pb, key, value)) + .collect::>>()?; + Ok(Predicate::and(predicates)) + } + + fn partition_value_predicate( + &self, + pb: &crate::spec::PredicateBuilder, + key: &str, + value: &Option, + ) -> Result { + match value { + None => pb.is_null(key), + Some(Datum::String(value)) if value == &self.partition_default_name => pb.is_null(key), + Some(datum) => pb.equal(key, datum.clone()), + } + } + + fn validate_partition_spec_keys( + &self, + partition: &HashMap>, + ) -> Result<()> { + let partition_keys: HashSet<&str> = self + .table + .schema() + .partition_keys() + .iter() + .map(String::as_str) + .collect(); + for key in partition.keys() { + if !partition_keys.contains(key.as_str()) { + return Err(crate::Error::DataInvalid { + message: format!( + "Partition spec key '{key}' is not a partition column. Partition keys are: {:?}.", + self.table.schema().partition_keys() + ), + source: None, + }); + } + } + Ok(()) + } + + fn validate_static_overwrite_entries( + &self, + partition_filter: &PartitionFilter, + new_entries: &[ManifestEntry], + ) -> Result<()> { + for entry in new_entries { + if *entry.kind() == FileKind::Add + && !partition_filter.matches_entry(entry.partition())? + { + return Err(crate::Error::DataInvalid { + message: format!( + "Trying to overwrite static partition, but file '{}' in bucket {} does not belong to this partition.", + entry.file().file_name, + entry.bucket(), + ), + source: None, + }); + } + } + Ok(()) + } + /// Drop specific partitions (OVERWRITE with only deletes). pub async fn truncate_partitions( &self, partitions: Vec>>, + ) -> Result<()> { + self.truncate_partitions_with_identifier(partitions, BATCH_COMMIT_IDENTIFIER) + .await + } + + /// Drop specific partitions with a caller-provided commit identifier. + pub async fn truncate_partitions_with_identifier( + &self, + partitions: Vec>>, + commit_identifier: i64, ) -> Result<()> { if partitions.is_empty() { return Ok(()); } - let partition_fields = self.table.schema().partition_fields(); - let partition_filter = PartitionFilter::from_partition_set( - self.partitions_to_bytes(&partitions), - &partition_fields, - )?; + let partition_filter = self.build_partition_filter_from_specs(&partitions)?; self.try_commit( CommitEntriesPlan::Overwrite { partition_filter: Some(partition_filter), new_entries: vec![], new_index_entries: vec![], + cached_snapshot: None, + cached_entries: Vec::new(), + full_scan_count: 0, + delta_probe_count: 0, }, None, + commit_identifier, ) .await } - fn partitions_to_bytes( + /// Python-compatible alias for dropping partitions. + pub async fn drop_partitions( &self, - partitions: &[HashMap>], - ) -> HashSet> { - let partition_fields = self.table.schema().partition_fields(); - let partition_keys = self.table.schema().partition_keys(); - partitions - .iter() - .map(|p| { - let owned_datums: Vec<(Option, DataType)> = partition_keys - .iter() - .enumerate() - .map(|(i, key)| { - let datum = p.get(key).cloned().flatten(); - let dt = partition_fields[i].data_type().clone(); - (datum, dt) - }) - .collect(); - let refs: Vec<(&Option, &DataType)> = - owned_datums.iter().map(|(d, t)| (d, t)).collect(); - datums_to_binary_row(&refs) - }) - .collect() + partitions: Vec>>, + ) -> Result<()> { + self.drop_partitions_with_identifier(partitions, BATCH_COMMIT_IDENTIFIER) + .await + } + + /// Python-compatible alias for dropping partitions with a caller-provided + /// commit identifier. Unlike `truncate_partitions`, an empty partition list + /// is rejected just like `FileStoreCommit.drop_partitions`. + pub async fn drop_partitions_with_identifier( + &self, + partitions: Vec>>, + commit_identifier: i64, + ) -> Result<()> { + if partitions.is_empty() { + return Err(crate::Error::DataInvalid { + message: "Partitions list cannot be empty.".to_string(), + source: None, + }); + } + self.truncate_partitions_with_identifier(partitions, commit_identifier) + .await } /// Truncate the entire table (OVERWRITE with no filter, only deletes). pub async fn truncate_table(&self) -> Result<()> { + self.truncate_table_with_identifier(BATCH_COMMIT_IDENTIFIER) + .await + } + + /// Truncate the entire table with a caller-provided commit identifier. + pub async fn truncate_table_with_identifier(&self, commit_identifier: i64) -> Result<()> { self.try_commit( CommitEntriesPlan::Overwrite { partition_filter: None, new_entries: vec![], new_index_entries: vec![], + cached_snapshot: None, + cached_entries: Vec::new(), + full_scan_count: 0, + delta_probe_count: 0, }, None, + commit_identifier, ) .await } + /// Abort a prepared commit by deleting newly written data and changelog files. + /// + /// Deletion is best-effort and mirrors Python `FileStoreCommit.abort`: missing + /// files or storage errors are ignored so abort cleanup never masks the + /// original write failure. + pub async fn abort(&self, commit_messages: &[CommitMessage]) -> Result<()> { + for message in commit_messages { + let bucket_path = self.bucket_path(&message.partition, message.bucket)?; + for file in message + .new_files + .iter() + .chain(message.new_changelog_files.iter()) + { + for path in file.collect_files(&bucket_path) { + let _ = self.table.file_io().delete_file(&path).await; + } + } + } + Ok(()) + } + + fn bucket_path(&self, partition: &[u8], bucket: i32) -> Result { + let base = self.table.location().trim_end_matches('/'); + let partition_keys = self.table.schema().partition_keys(); + if partition_keys.is_empty() { + return Ok(format!("{base}/{}", bucket_dir_name(bucket))); + } + + let partition_row = BinaryRow::from_serialized_bytes(partition)?; + let core_options = CoreOptions::new(self.table.schema().options()); + let computer = PartitionComputer::new( + partition_keys, + self.table.schema().fields(), + core_options.partition_default_name(), + core_options.legacy_partition_name(), + )?; + Ok(format!( + "{base}/{}{}", + computer.generate_partition_path(&partition_row)?, + bucket_dir_name(bucket) + )) + } + /// Try to commit with retries. async fn try_commit( &self, - plan: CommitEntriesPlan, + mut plan: CommitEntriesPlan, expected_snapshot_id: Option, + commit_identifier: i64, ) -> Result<()> { let mut retry_count = 0u32; - let mut last_snapshot_for_dup_check: Option = None; + let mut duplicate_check_start_snapshot_id: Option = None; + let mut retry_state: Option> = None; let start_time_ms = current_time_millis(); loop { let latest_snapshot = self.snapshot_manager.get_latest_snapshot().await?; + if let Some(start_snapshot_id) = duplicate_check_start_snapshot_id { + if self + .is_duplicate_commit( + start_snapshot_id, + &latest_snapshot, + commit_identifier, + &plan.commit_kind_hint(), + ) + .await + { + break; + } + } validate_expected_latest_snapshot(expected_snapshot_id, &latest_snapshot)?; - let resolved = self.resolve_commit(&plan, &latest_snapshot).await?; + let resolved = self + .resolve_commit(&mut plan, &latest_snapshot, retry_state.as_deref()) + .await?; if resolved.entries.is_empty() && resolved.changelog_entries.is_empty() @@ -354,33 +603,23 @@ impl TableCommit { break; } - // Check for duplicate commit (idempotency on retry) - if self - .is_duplicate_commit( - &last_snapshot_for_dup_check, - &latest_snapshot, - &resolved.kind, - ) - .await - { - break; - } - - let result = self.try_commit_once(resolved, &latest_snapshot).await?; + let result = self + .try_commit_once(resolved, &latest_snapshot, commit_identifier) + .await?; match result { - true => break, - false => { - last_snapshot_for_dup_check = latest_snapshot; + CommitAttemptResult::Success => break, + CommitAttemptResult::Retry(state) => { + duplicate_check_start_snapshot_id.get_or_insert_with(|| { + latest_snapshot.as_ref().map(|s| s.id() + 1).unwrap_or(1) + }); + retry_state = Some(state); } } let elapsed_ms = current_time_millis() - start_time_ms; if elapsed_ms > self.commit_timeout_ms || retry_count >= self.commit_max_retries { - let snap_id = last_snapshot_for_dup_check - .as_ref() - .map(|s| s.id() + 1) - .unwrap_or(1); + let snap_id = duplicate_check_start_snapshot_id.unwrap_or(1); return Err(crate::Error::DataInvalid { message: format!( "Commit failed for snapshot {} after {} millis with {} retries, \ @@ -403,7 +642,8 @@ impl TableCommit { &self, mut resolved: ResolvedCommit, latest_snapshot: &Option, - ) -> Result { + commit_identifier: i64, + ) -> Result { let new_snapshot_id = latest_snapshot.as_ref().map(|s| s.id() + 1).unwrap_or(1); // Row tracking @@ -423,7 +663,7 @@ impl TableCommit { new_snapshot_id, first_row_id_start, resolved.entries, - ); + )?; resolved.entries = assigned; next_row_id = Some(nrid); } @@ -436,58 +676,63 @@ impl TableCommit { let base_manifest_list_name = format!("manifest-list-{unique_id}-0"); let delta_manifest_list_name = format!("manifest-list-{unique_id}-1"); let changelog_manifest_list_name = format!("manifest-list-{unique_id}-2"); - let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); - let changelog_manifest_name = format!("manifest-{}-1", uuid::Uuid::new_v4()); + let new_manifest_prefix = format!("manifest-{}", uuid::Uuid::new_v4()); + let changelog_manifest_prefix = format!("manifest-{}-changelog", uuid::Uuid::new_v4()); let base_manifest_list_path = format!("{manifest_dir}/{base_manifest_list_name}"); let delta_manifest_list_path = format!("{manifest_dir}/{delta_manifest_list_name}"); let changelog_manifest_list_path = format!("{manifest_dir}/{changelog_manifest_list_name}"); - let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}"); - let changelog_manifest_path = format!("{manifest_dir}/{changelog_manifest_name}"); - // Write manifest file - let new_manifest_file_meta = self - .write_manifest_file( + // Write delta manifest files, rolling by target size. + let new_manifest_file_metas = self + .write_manifest_files( file_io, - &new_manifest_path, - &new_manifest_name, + &manifest_dir, + &new_manifest_prefix, &resolved.entries, ) .await?; // Write delta manifest list - ManifestList::write( + ManifestList::write_with_compression( file_io, &delta_manifest_list_path, - &[new_manifest_file_meta], + &new_manifest_file_metas, + &self.manifest_compression, ) .await?; - let changelog_record_count = if resolved.changelog_entries.is_empty() { - None - } else { - let changelog_manifest_file_meta = self - .write_manifest_file( + let (changelog_record_count, changelog_manifest_list_size) = + if resolved.changelog_entries.is_empty() { + (None, None) + } else { + let changelog_manifest_file_metas = self + .write_manifest_files( + file_io, + &manifest_dir, + &changelog_manifest_prefix, + &resolved.changelog_entries, + ) + .await?; + ManifestList::write_with_compression( file_io, - &changelog_manifest_path, - &changelog_manifest_name, - &resolved.changelog_entries, + &changelog_manifest_list_path, + &changelog_manifest_file_metas, + &self.manifest_compression, ) .await?; - ManifestList::write( - file_io, - &changelog_manifest_list_path, - &[changelog_manifest_file_meta], - ) - .await?; - Some( - resolved - .changelog_entries - .iter() - .map(|entry| entry.file().row_count) - .sum(), - ) - }; + let status = file_io.get_status(&changelog_manifest_list_path).await?; + ( + Some( + resolved + .changelog_entries + .iter() + .map(|entry| entry.file().row_count) + .sum(), + ), + Some(status.size as i64), + ) + }; // Read existing manifests (base + delta from previous snapshot) and write base manifest list let mut total_record_count: i64 = 0; @@ -506,7 +751,17 @@ impl TableCommit { vec![] }; - ManifestList::write(file_io, &base_manifest_list_path, &existing_manifest_files).await?; + let (base_manifest_files, _merge_new_files) = self + .merge_manifest_files(file_io, &manifest_dir, existing_manifest_files) + .await?; + + ManifestList::write_with_compression( + file_io, + &base_manifest_list_path, + &base_manifest_files, + &self.manifest_compression, + ) + .await?; // Calculate delta record count let mut delta_record_count: i64 = 0; @@ -525,12 +780,13 @@ impl TableCommit { .base_manifest_list(base_manifest_list_name) .delta_manifest_list(delta_manifest_list_name) .commit_user(self.commit_user.clone()) - .commit_identifier(BATCH_COMMIT_IDENTIFIER) + .commit_identifier(commit_identifier) .commit_kind(resolved.kind) .time_millis(current_time_millis()) .total_record_count(Some(total_record_count)) .delta_record_count(Some(delta_record_count)) .changelog_manifest_list(changelog_record_count.map(|_| changelog_manifest_list_name)) + .changelog_manifest_list_size(changelog_manifest_list_size) .changelog_record_count(changelog_record_count) .next_row_id(next_row_id) .index_manifest(resolved.index_manifest_name) @@ -538,13 +794,21 @@ impl TableCommit { let statistics = self.generate_partition_statistics(&resolved.entries)?; - self.snapshot_commit.commit(&snapshot, &statistics).await + if self.snapshot_commit.commit(&snapshot, &statistics).await? { + Ok(CommitAttemptResult::Success) + } else { + Ok(CommitAttemptResult::Retry(Box::new(RetryState { + latest_snapshot: latest_snapshot.clone(), + base_data_files: resolved.base_data_files.take(), + }))) + } } /// Write an index manifest file from already-merged entries. /// /// Returns `None` if `merged_index_entries` is empty. async fn write_index_manifest( + &self, file_io: &FileIO, manifest_dir: &str, merged_index_entries: &[IndexManifestEntry], @@ -554,19 +818,184 @@ impl TableCommit { } let name = format!("index-manifest-{}-0", uuid::Uuid::new_v4()); let path = format!("{manifest_dir}/{name}"); - IndexManifest::write(file_io, &path, merged_index_entries).await?; + IndexManifest::write_with_compression( + file_io, + &path, + merged_index_entries, + &self.manifest_compression, + ) + .await?; Ok(Some(name)) } - /// Write a manifest file and return its metadata. - async fn write_manifest_file( + /// Write manifest files, rolling by configured target size, and return their metadata. + async fn write_manifest_files( + &self, + file_io: &FileIO, + manifest_dir: &str, + name_prefix: &str, + entries: &[ManifestEntry], + ) -> Result> { + if entries.is_empty() { + return Ok(vec![]); + } + + let target_size = self.manifest_target_size.max(1) as usize; + let mut result = Vec::new(); + let mut chunk_start = 0usize; + let schema = Schema::parse_str(MANIFEST_ENTRY_SCHEMA)?; + let mut writer = crate::spec::new_avro_writer( + &schema, + &self.manifest_compression, + crate::spec::DEFAULT_AVRO_BLOCK_SIZE, + )?; + + for (idx, entry) in entries.iter().enumerate() { + let value = to_value(entry).and_then(|value| value.resolve(&schema))?; + writer.append(value)?; + let record_count = idx + 1; + if record_count % CHECK_ROLLING_RECORD_COUNT == 0 + && writer.get_ref().len() >= target_size + { + let chunk_end = idx + 1; + let file_name = format!("{name_prefix}-{}", result.len()); + let path = format!("{manifest_dir}/{file_name}"); + let bytes = writer.into_inner()?; + let meta = self + .write_manifest_file_bytes( + file_io, + &path, + &file_name, + &entries[chunk_start..chunk_end], + bytes, + ) + .await?; + result.push(meta); + chunk_start = chunk_end; + writer = crate::spec::new_avro_writer( + &schema, + &self.manifest_compression, + crate::spec::DEFAULT_AVRO_BLOCK_SIZE, + )?; + } + } + + if chunk_start < entries.len() { + let file_name = format!("{name_prefix}-{}", result.len()); + let path = format!("{manifest_dir}/{file_name}"); + let bytes = writer.into_inner()?; + let meta = self + .write_manifest_file_bytes( + file_io, + &path, + &file_name, + &entries[chunk_start..], + bytes, + ) + .await?; + result.push(meta); + } + + Ok(result) + } + + /// Minor-compact existing manifest files before writing the base manifest list. + async fn merge_manifest_files( + &self, + file_io: &FileIO, + manifest_dir: &str, + manifest_files: Vec, + ) -> Result<(Vec, Vec)> { + if manifest_files.is_empty() { + return Ok((vec![], vec![])); + } + + let target_size = self.manifest_target_size.max(1); + let mut result = Vec::new(); + let mut new_files = Vec::new(); + let mut candidates = Vec::new(); + let mut total_size = 0i64; + + for manifest in manifest_files { + total_size += manifest.file_size(); + candidates.push(manifest); + if total_size >= target_size { + self.merge_manifest_candidates( + file_io, + manifest_dir, + &mut candidates, + &mut result, + &mut new_files, + ) + .await?; + total_size = 0; + } + } + + if candidates.len() >= self.manifest_merge_min_count { + self.merge_manifest_candidates( + file_io, + manifest_dir, + &mut candidates, + &mut result, + &mut new_files, + ) + .await?; + } else { + result.append(&mut candidates); + } + + Ok((result, new_files)) + } + + async fn merge_manifest_candidates( + &self, + file_io: &FileIO, + manifest_dir: &str, + candidates: &mut Vec, + result: &mut Vec, + new_files: &mut Vec, + ) -> Result<()> { + if candidates.is_empty() { + return Ok(()); + } + if candidates.len() == 1 { + result.append(candidates); + return Ok(()); + } + + let mut entries = Vec::new(); + for manifest in candidates.drain(..) { + let path = format!("{manifest_dir}/{}", manifest.file_name()); + entries.extend(Manifest::read(file_io, &path).await?); + } + + let merged_entries = merge_active_entries(entries); + if merged_entries.is_empty() { + return Ok(()); + } + + let manifest_prefix = format!("manifest-{}", uuid::Uuid::new_v4()); + let merged_metas = self + .write_manifest_files(file_io, manifest_dir, &manifest_prefix, &merged_entries) + .await?; + result.extend(merged_metas.clone()); + new_files.extend(merged_metas); + Ok(()) + } + + /// Write already-encoded manifest bytes and return metadata for the corresponding entries. + async fn write_manifest_file_bytes( &self, file_io: &FileIO, path: &str, file_name: &str, entries: &[ManifestEntry], + bytes: Vec, ) -> Result { - Manifest::write(file_io, path, entries).await?; + let file_size = bytes.len() as i64; + let output = file_io.new_output(path)?; + output.write(bytes::Bytes::from(bytes)).await?; let mut added_file_count: i64 = 0; let mut deleted_file_count: i64 = 0; @@ -577,47 +1006,63 @@ impl TableCommit { let mut max_bucket: Option = None; let mut min_level: Option = None; let mut max_level: Option = None; + let mut min_row_id: Option = None; + let mut max_row_id: Option = None; + let mut all_entries_have_row_id = !entries.is_empty(); + let mut schema_id = self.table.schema().id(); for entry in entries { match entry.kind() { FileKind::Add => added_file_count += 1, FileKind::Delete => deleted_file_count += 1, } + schema_id = schema_id.max(entry.file().schema_id); let b = entry.bucket(); min_bucket = Some(min_bucket.map_or(b, |cur| cur.min(b))); max_bucket = Some(max_bucket.map_or(b, |cur| cur.max(b))); let l = entry.file().level; min_level = Some(min_level.map_or(l, |cur| cur.min(l))); max_level = Some(max_level.map_or(l, |cur| cur.max(l))); + if let Some((start, end)) = entry.file().row_id_range() { + min_row_id = Some(min_row_id.map_or(start, |cur| cur.min(start))); + max_row_id = Some(max_row_id.map_or(end, |cur| cur.max(end))); + } else { + all_entries_have_row_id = false; + } + } + if !all_entries_have_row_id { + min_row_id = None; + max_row_id = None; } - - // Get file size - let status = file_io.get_status(path).await?; let partition_stats = self.compute_partition_stats(entries)?; Ok(ManifestFileMeta::new( file_name.to_string(), - status.size as i64, + file_size, added_file_count, deleted_file_count, partition_stats, - self.table.schema().id(), + schema_id, ) - .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level)) + .with_bucket_level_stats(min_bucket, max_bucket, min_level, max_level) + .with_row_id_stats(min_row_id, max_row_id)) } /// Check if this commit was already completed (idempotency). async fn is_duplicate_commit( &self, - last_snapshot_for_dup_check: &Option, + start_snapshot_id: i64, latest_snapshot: &Option, + commit_identifier: i64, commit_kind: &CommitKind, ) -> bool { - if let (Some(prev_snap), Some(latest)) = (last_snapshot_for_dup_check, latest_snapshot) { - let start_id = prev_snap.id() + 1; - for snapshot_id in start_id..=latest.id() { + if let Some(latest) = latest_snapshot { + for snapshot_id in start_snapshot_id..=latest.id() { if let Ok(snap) = self.snapshot_manager.get_snapshot(snapshot_id).await { - if snap.commit_user() == self.commit_user && snap.commit_kind() == commit_kind { + if snap.commit_user() == self.commit_user + && snap.commit_identifier() == commit_identifier + && snap.commit_kind() == commit_kind + { return true; } } @@ -629,8 +1074,9 @@ impl TableCommit { /// Resolve commit entries and merge index entries based on the plan type. async fn resolve_commit( &self, - plan: &CommitEntriesPlan, + plan: &mut CommitEntriesPlan, latest_snapshot: &Option, + retry_state: Option<&RetryState>, ) -> Result { let file_io = self.snapshot_manager.file_io(); let manifest_dir = self.snapshot_manager.manifest_dir(); @@ -640,13 +1086,8 @@ impl TableCommit { entries, changelog_entries, new_index_entries, + check_from_snapshot, } => { - if self.row_tracking_enabled { - self.validate_row_id_alignment(entries, latest_snapshot) - .await?; - } - self.validate_deleted_files(entries, latest_snapshot) - .await?; // Auto-promote to OVERWRITE when CoW rewrites produce Delete entries. // This ensures the snapshot correctly reflects file replacements. let has_delete = entries.iter().any(|e| *e.kind() == FileKind::Delete); @@ -655,19 +1096,39 @@ impl TableCommit { } else { CommitKind::APPEND }; - - let previous = - Self::read_prev_index_entries(file_io, &manifest_dir, latest_snapshot).await?; - let drop_previous_global_indexes = - !entries.is_empty() || !changelog_entries.is_empty(); - let all = Self::merge_index_entries( - &previous, + let detect_conflicts = has_delete || check_from_snapshot.is_some(); + let base_data_files = if detect_conflicts { + self.detect_commit_conflicts( + latest_snapshot, + retry_state, + entries, + &kind, + *check_from_snapshot, + ) + .await? + } else { + if self.row_tracking_enabled { + self.validate_row_id_alignment(entries, latest_snapshot) + .await?; + } + self.validate_deleted_files(entries, latest_snapshot) + .await?; + None + }; + + let previous = + Self::read_prev_index_entries(file_io, &manifest_dir, latest_snapshot).await?; + let mut index_entries = new_index_entries.clone(); + index_entries.extend(self.global_index_update_entries( + &previous, + entries, new_index_entries, - drop_previous_global_indexes, - )?; + )?); + let all = Self::merge_index_entries(&previous, &index_entries, false)?; let index_manifest_changed = all != previous; let index_manifest_name = if index_manifest_changed { - Self::write_index_manifest(file_io, &manifest_dir, &all).await? + self.write_index_manifest(file_io, &manifest_dir, &all) + .await? } else { latest_snapshot .as_ref() @@ -680,18 +1141,28 @@ impl TableCommit { kind, index_manifest_name, index_manifest_changed, + base_data_files, }) } - CommitEntriesPlan::Overwrite { - partition_filter, - new_entries, - new_index_entries, - } => { + CommitEntriesPlan::Overwrite { .. } => { let entries = self - .generate_overwrite_entries( + .provide_overwrite_entries(plan, latest_snapshot) + .await?; + let (partition_filter, new_index_entries) = match plan { + CommitEntriesPlan::Overwrite { + partition_filter, + new_index_entries, + .. + } => (partition_filter.clone(), new_index_entries.clone()), + CommitEntriesPlan::Direct { .. } => unreachable!(), + }; + let base_data_files = self + .detect_commit_conflicts( latest_snapshot, - partition_filter.as_ref(), - new_entries, + retry_state, + &entries, + &CommitKind::OVERWRITE, + None, ) .await?; @@ -710,12 +1181,11 @@ impl TableCommit { all = retained; } } - Self::validate_global_index_overlap(&all, new_index_entries)?; - Self::validate_added_global_index_overlap(new_index_entries)?; - all.extend_from_slice(new_index_entries); + let all = Self::merge_index_entries(&all, &new_index_entries, false)?; let index_manifest_changed = all != previous; let index_manifest_name = if index_manifest_changed { - Self::write_index_manifest(file_io, &manifest_dir, &all).await? + self.write_index_manifest(file_io, &manifest_dir, &all) + .await? } else { latest_snapshot .as_ref() @@ -728,6 +1198,7 @@ impl TableCommit { kind: CommitKind::OVERWRITE, index_manifest_name, index_manifest_changed, + base_data_files, }) } } @@ -747,7 +1218,24 @@ impl TableCommit { } else { previous_entries.to_vec() }; - let new_hash_keys: HashSet<(Vec, i32)> = new_index_entries + let deletions = new_index_entries + .iter() + .filter(|entry| entry.kind == FileKind::Delete) + .collect::>(); + if !deletions.is_empty() { + all.retain(|entry| { + !deletions + .iter() + .any(|delete| same_index_file_entry(entry, delete)) + }); + } + + let additions = new_index_entries + .iter() + .filter(|entry| entry.kind == FileKind::Add) + .cloned() + .collect::>(); + let new_hash_keys: HashSet<(Vec, i32)> = additions .iter() .filter(|e| e.index_file.index_type == "HASH") .map(|e| (e.partition.clone(), e.bucket)) @@ -759,12 +1247,117 @@ impl TableCommit { true } }); - Self::validate_global_index_overlap(&all, new_index_entries)?; - Self::validate_added_global_index_overlap(new_index_entries)?; - all.extend_from_slice(new_index_entries); + Self::validate_global_index_overlap(&all, &additions)?; + Self::validate_added_global_index_overlap(&additions)?; + all.extend(additions); Ok(all) } + fn global_index_update_entries( + &self, + previous_entries: &[IndexManifestEntry], + commit_entries: &[ManifestEntry], + new_index_entries: &[IndexManifestEntry], + ) -> Result> { + if new_index_entries + .iter() + .any(|entry| entry.kind == FileKind::Delete) + { + return Ok(vec![]); + } + + let mut updated_cols = HashSet::new(); + let mut written_partitions: Vec> = Vec::new(); + for entry in commit_entries + .iter() + .filter(|entry| *entry.kind() == FileKind::Add) + { + let Some(write_cols) = entry.file().write_cols.as_ref() else { + continue; + }; + for col in write_cols { + if !is_system_field(col) { + updated_cols.insert(col.clone()); + } + } + if !written_partitions + .iter() + .any(|partition| same_index_partition(partition, entry.partition())) + { + written_partitions.push(entry.partition().to_vec()); + } + } + if updated_cols.is_empty() || written_partitions.is_empty() { + return Ok(vec![]); + } + + let field_by_id = self + .table + .schema() + .fields() + .iter() + .map(|field| (field.id(), field.name().to_string())) + .collect::>(); + + let mut affected = Vec::new(); + let mut conflicted_cols = HashSet::new(); + for entry in previous_entries { + if entry.kind != FileKind::Add + || !written_partitions + .iter() + .any(|partition| same_index_partition(partition, &entry.partition)) + { + continue; + } + let Some(global_meta) = entry.index_file.global_index_meta.as_ref() else { + continue; + }; + let mut indexed_field_ids = vec![global_meta.index_field_id]; + if let Some(extra_field_ids) = global_meta.extra_field_ids.as_ref() { + indexed_field_ids.extend(extra_field_ids.iter().copied()); + } + let matched = indexed_field_ids + .iter() + .filter_map(|field_id| field_by_id.get(field_id)) + .filter(|field_name| updated_cols.contains(*field_name)) + .cloned() + .collect::>(); + if !matched.is_empty() { + conflicted_cols.extend(matched); + affected.push(entry.clone()); + } + } + if affected.is_empty() { + return Ok(vec![]); + } + + match CoreOptions::new(self.table.schema().options()).global_index_column_update_action()? { + GlobalIndexColumnUpdateAction::DropPartitionIndex => Ok(affected + .into_iter() + .map(|entry| IndexManifestEntry { + kind: FileKind::Delete, + partition: entry.partition, + bucket: entry.bucket, + index_file: entry.index_file, + version: entry.version, + }) + .collect()), + GlobalIndexColumnUpdateAction::ThrowError => { + let mut updated = updated_cols.into_iter().collect::>(); + updated.sort(); + let mut conflicted = conflicted_cols.into_iter().collect::>(); + conflicted.sort(); + Err(crate::Error::DataInvalid { + message: format!( + "Update columns contain globally indexed columns, not supported now. Updated columns: {:?}. Conflicted columns: {:?}.", + updated, conflicted + ), + source: None, + }) + } + } + } + fn validate_global_index_overlap( retained_entries: &[IndexManifestEntry], added_entries: &[IndexManifestEntry], @@ -844,148 +1437,729 @@ impl TableCommit { if let Some(snap) = latest_snapshot { if let Some(prev_index_manifest) = snap.index_manifest() { let prev_path = format!("{manifest_dir}/{prev_index_manifest}"); - return IndexManifest::read(file_io, &prev_path).await; + return Ok(normalize_index_entries( + IndexManifest::read(file_io, &prev_path).await?, + )); } } Ok(vec![]) } - /// Generate overwrite entries: DELETE existing + ADD new. - async fn generate_overwrite_entries( + /// Stateful overwrite provider mirroring Python `OverwriteChangesProvider`. + async fn provide_overwrite_entries( &self, + plan: &mut CommitEntriesPlan, latest_snapshot: &Option, - partition_filter: Option<&PartitionFilter>, - new_entries: &[ManifestEntry], ) -> Result> { - let mut entries = Vec::new(); + let CommitEntriesPlan::Overwrite { + partition_filter, + new_entries, + cached_snapshot, + cached_entries, + full_scan_count, + delta_probe_count, + .. + } = plan + else { + unreachable!("provide_overwrite_entries only accepts overwrite plans"); + }; - if let Some(snap) = latest_snapshot { - let scan = TableScan::new( - &self.table, - partition_filter.cloned(), - vec![], - None, - None, - None, - ) - .with_scan_all_files(); - let current_entries = scan.plan_manifest_entries(snap).await?; - for entry in current_entries { - entries.push(entry.with_kind(FileKind::Delete)); + let Some(latest) = latest_snapshot else { + return Ok(Self::build_overwrite_result(&[], new_entries)); + }; + + let rebuild_cache = match cached_snapshot.as_ref() { + None => true, + Some(cached) if cached.id() > latest.id() => { + return Err(crate::Error::DataInvalid { + message: format!( + "Cached snapshot id {} is greater than latest snapshot id {}.", + cached.id(), + latest.id() + ), + source: None, + }); + } + Some(cached) if cached.id() < latest.id() => { + !self + .can_use_overwrite_cache( + cached, + latest, + partition_filter.as_ref(), + delta_probe_count, + ) + .await? + } + Some(_) => false, + }; + + if rebuild_cache { + *cached_entries = self + .scan_snapshot_entries(latest_snapshot, partition_filter.as_ref()) + .await?; + *full_scan_count += 1; + } + *cached_snapshot = Some(Box::new(latest.clone())); + + Ok(Self::build_overwrite_result(cached_entries, new_entries)) + } + + async fn can_use_overwrite_cache( + &self, + cached_snapshot: &Snapshot, + latest_snapshot: &Snapshot, + partition_filter: Option<&PartitionFilter>, + delta_probe_count: &mut usize, + ) -> Result { + let Some(partition_filter) = partition_filter else { + return Ok(false); + }; + + for snapshot_id in cached_snapshot.id() + 1..=latest_snapshot.id() { + *delta_probe_count += 1; + let snapshot = match self.snapshot_manager.get_snapshot(snapshot_id).await { + Ok(snapshot) => snapshot, + Err(_) => return Ok(false), + }; + if snapshot.commit_kind() != &CommitKind::APPEND { + return Ok(false); + } + let delta_entries = self + .read_delta_entries(Some(partition_filter), &snapshot) + .await?; + if !delta_entries.is_empty() { + return Ok(false); } } + Ok(true) + } + + fn build_overwrite_result( + existing_entries: &[ManifestEntry], + new_entries: &[ManifestEntry], + ) -> Vec { + let mut entries = existing_entries + .iter() + .cloned() + .map(|entry| entry.with_kind(FileKind::Delete)) + .collect::>(); entries.extend(new_entries.iter().cloned()); - Ok(entries) + entries } - /// Assign row tracking metadata: snapshot ID as sequence number, and - /// first_row_id for new APPEND files that don't already have one. - /// Normal files advance the main counter. Blob files (identified by file name) - /// use per-column counters starting from the same base, since each blob column - /// rolls independently. - fn assign_row_tracking_meta( + async fn scan_snapshot_entries( &self, - snapshot_id: i64, - first_row_id_start: i64, - entries: Vec, - ) -> (Vec, i64) { - let mut result = Vec::with_capacity(entries.len()); - let mut start = first_row_id_start; - // Per blob column (write_cols key) counter, each starts from first_row_id_start. - let mut blob_starts: HashMap, i64> = HashMap::new(); + snapshot: &Option, + partition_filter: Option<&PartitionFilter>, + ) -> Result> { + let Some(snap) = snapshot else { + return Ok(vec![]); + }; + let file_io = self.snapshot_manager.file_io(); + let manifest_dir = self.snapshot_manager.manifest_dir(); + let mut entries = Vec::new(); + for manifest_list in [snap.base_manifest_list(), snap.delta_manifest_list()] { + let manifest_list_path = format!("{manifest_dir}/{manifest_list}"); + for manifest_file in ManifestList::read(file_io, &manifest_list_path).await? { + let manifest_path = format!("{manifest_dir}/{}", manifest_file.file_name()); + for entry in Manifest::read(file_io, &manifest_path).await? { + if let Some(filter) = partition_filter { + if !filter.matches_entry(entry.partition())? { + continue; + } + } + entries.push(entry); + } + } + } + Ok(merge_active_entries(entries)) + } - for entry in entries { - let mut entry = entry.with_sequence_number(snapshot_id, snapshot_id); - if *entry.kind() == FileKind::Add - && entry.file().file_source == Some(0) // APPEND - && entry.file().first_row_id.is_none() - { - let is_blob_file = - crate::table::blob_file_writer::is_blob_file_name(&entry.file().file_name); - if is_blob_file { - let key = entry.file().write_cols.clone().unwrap_or_default(); - let blob_start = blob_starts.entry(key).or_insert(first_row_id_start); - entry = entry.with_first_row_id(*blob_start); - *blob_start += entry.file().row_count; - } else { - entry = entry.with_first_row_id(start); - start += entry.file().row_count; + async fn scan_changed_partition_entries( + &self, + snapshot: &Option, + commit_entries: &[ManifestEntry], + ) -> Result> { + let entry_refs = commit_entries.iter().collect::>(); + let partition_filter = self.build_entries_partition_filter(&entry_refs)?; + self.scan_snapshot_entries(snapshot, partition_filter.as_ref()) + .await + } + + async fn read_delta_entries( + &self, + partition_filter: Option<&PartitionFilter>, + snapshot: &Snapshot, + ) -> Result> { + let file_io = self.snapshot_manager.file_io(); + let manifest_dir = self.snapshot_manager.manifest_dir(); + let delta_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); + let manifest_files = ManifestList::read(file_io, &delta_path).await?; + let mut entries = Vec::new(); + for manifest in manifest_files { + let path = format!("{manifest_dir}/{}", manifest.file_name()); + for entry in Manifest::read(file_io, &path).await? { + if let Some(filter) = partition_filter { + if !filter.matches_entry(entry.partition())? { + continue; + } } + entries.push(entry); } - result.push(entry); } + Ok(entries) + } - (result, start) + async fn read_incremental_changes( + &self, + from_snapshot: &Snapshot, + to_snapshot: &Snapshot, + commit_entries: &[ManifestEntry], + ) -> Result>> { + let entry_refs = commit_entries.iter().collect::>(); + let partition_filter = self.build_entries_partition_filter(&entry_refs)?; + let mut entries = Vec::new(); + for snapshot_id in from_snapshot.id() + 1..=to_snapshot.id() { + let snapshot = match self.snapshot_manager.get_snapshot(snapshot_id).await { + Ok(snapshot) => snapshot, + Err(_) => return Ok(None), + }; + entries.extend( + self.read_delta_entries(partition_filter.as_ref(), &snapshot) + .await?, + ); + } + Ok(Some(entries)) } - /// Validate that files with pre-assigned `first_row_id` (e.g. partial-column - /// files from MERGE INTO) still match existing files in the current snapshot. - /// - /// When MERGE INTO and COMPACT run concurrently, compaction may rewrite the - /// original files that partial-column files reference. If the original file's - /// row ID range no longer exists, the partial-column files become invalid and - /// the commit must be rejected. - async fn validate_row_id_alignment( + async fn detect_commit_conflicts( &self, + latest_snapshot: &Option, + retry_state: Option<&RetryState>, commit_entries: &[ManifestEntry], + commit_kind: &CommitKind, + check_from_snapshot: Option, + ) -> Result>> { + let base_data_files = self + .resolve_conflict_base_entries(latest_snapshot, retry_state, commit_entries) + .await?; + self.check_commit_conflicts( + latest_snapshot.as_ref(), + &base_data_files, + commit_entries, + commit_kind, + check_from_snapshot, + ) + .await?; + Ok(Some(base_data_files)) + } + + async fn resolve_conflict_base_entries( + &self, latest_snapshot: &Option, + retry_state: Option<&RetryState>, + commit_entries: &[ManifestEntry], + ) -> Result> { + let Some(latest) = latest_snapshot else { + return Ok(vec![]); + }; + + if let Some(RetryState { + latest_snapshot: Some(previous_snapshot), + base_data_files: Some(previous_base), + }) = retry_state + { + if let Some(incremental) = self + .read_incremental_changes(previous_snapshot, latest, commit_entries) + .await? + { + let mut base = previous_base.clone(); + base.extend(incremental); + return Ok(merge_active_entries(base)); + } + } + + self.scan_changed_partition_entries(latest_snapshot, commit_entries) + .await + } + + async fn check_commit_conflicts( + &self, + latest_snapshot: Option<&Snapshot>, + base_entries: &[ManifestEntry], + delta_entries: &[ManifestEntry], + commit_kind: &CommitKind, + check_from_snapshot: Option, ) -> Result<()> { - // Collect files that already have first_row_id assigned (pre-set by writer). - let files_to_check: Vec<_> = commit_entries - .iter() - .filter(|e| *e.kind() == FileKind::Add && e.file().first_row_id.is_some()) - .collect(); + self.check_delete_entries_against_base(base_entries, delta_entries)?; - if files_to_check.is_empty() { + if !self.data_evolution_enabled { return Ok(()); } - let snap = match latest_snapshot { - Some(s) => s, - None => { - // No existing snapshot means no existing files — any pre-assigned - // first_row_id cannot match anything. - let entry = &files_to_check[0]; + let next_row_id = latest_snapshot.and_then(Snapshot::next_row_id); + self.check_row_id_existence(base_entries, delta_entries, next_row_id)?; + + let mut all_entries = base_entries.to_vec(); + all_entries.extend(delta_entries.iter().cloned()); + let merged_entries = merge_active_entries(all_entries); + self.check_row_id_range_conflicts(commit_kind, check_from_snapshot, &merged_entries)?; + self.check_row_id_from_snapshot(latest_snapshot, delta_entries, check_from_snapshot) + .await + } + + fn check_delete_entries_against_base( + &self, + base_entries: &[ManifestEntry], + delta_entries: &[ManifestEntry], + ) -> Result<()> { + let base_identifiers = base_entries + .iter() + .map(ManifestEntry::identifier) + .collect::>(); + for entry in delta_entries + .iter() + .filter(|entry| *entry.kind() == FileKind::Delete) + { + if !base_identifiers.contains(&entry.identifier()) { return Err(crate::Error::DataInvalid { message: format!( - "Row ID conflict: file '{}' has pre-assigned first_row_id={} \ - but no snapshot exists. The referenced files may have been removed \ - by a concurrent compaction.", + "Delete conflict: file '{}' in bucket {} does not exist in the current snapshot.", entry.file().file_name, - entry.file().first_row_id.unwrap(), + entry.bucket(), ), source: None, }); } - }; + } + Ok(()) + } - // Read current files from the latest snapshot, filtered by partitions. - let partition_filter = self.build_entries_partition_filter(&files_to_check)?; - let scan = TableScan::new(&self.table, partition_filter, vec![], None, None, None) - .with_scan_all_files(); - let existing_entries = scan.plan_manifest_entries(snap).await?; + fn check_row_id_existence( + &self, + base_entries: &[ManifestEntry], + delta_entries: &[ManifestEntry], + next_row_id: Option, + ) -> Result<()> { + let Some(next_row_id) = next_row_id else { + return Ok(()); + }; - // Build index: (partition, bucket, first_row_id, row_count) - let existing_index: HashSet<(&[u8], i32, i64, i64)> = existing_entries + let files_to_check = delta_entries .iter() - .filter_map(|e| { - e.file() - .first_row_id - .map(|fid| (e.partition(), e.bucket(), fid, e.file().row_count)) + .filter(|entry| { + *entry.kind() == FileKind::Add + && entry + .file() + .first_row_id + .is_some_and(|first_row_id| first_row_id < next_row_id) }) - .collect(); + .collect::>(); + if files_to_check.is_empty() { + return Ok(()); + } - for entry in &files_to_check { - let fid = entry.file().first_row_id.unwrap(); - let key = ( - entry.partition(), - entry.bucket(), - fid, - entry.file().row_count, - ); - if !existing_index.contains(&key) { - return Err(crate::Error::DataInvalid { + let mut existing_index: HashSet<(Vec, i32, i64, i64)> = HashSet::new(); + let mut existing_ranges: ExistingRowIdRanges = HashMap::new(); + for base in base_entries { + if let Some(first_row_id) = base.file().first_row_id { + existing_index.insert(( + base.partition().to_vec(), + base.bucket(), + first_row_id, + base.file().row_count, + )); + if !is_dedicated_storage_file(base.file()) { + existing_ranges + .entry((base.partition().to_vec(), base.bucket())) + .or_default() + .push((first_row_id, first_row_id + base.file().row_count - 1)); + } + } + } + + for entry in files_to_check { + let first_row_id = entry.file().first_row_id.unwrap(); + if is_dedicated_storage_file(entry.file()) { + if let Some((start, end)) = entry.file().row_id_range() { + let overlaps_existing = existing_ranges + .get(&(entry.partition().to_vec(), entry.bucket())) + .is_some_and(|ranges| { + ranges.iter().any(|&(base_start, base_end)| { + ranges_overlap(start, end, base_start, base_end) + }) + }); + if overlaps_existing { + continue; + } + } + } + + let key = ( + entry.partition().to_vec(), + entry.bucket(), + first_row_id, + entry.file().row_count, + ); + if !existing_index.contains(&key) { + return Err(crate::Error::DataInvalid { + message: format!( + "Row ID existence conflict: file '{}' references first_row_id={}, row_count={} in bucket {}, but no matching file exists in the current snapshot.", + entry.file().file_name, + first_row_id, + entry.file().row_count, + entry.bucket(), + ), + source: None, + }); + } + } + Ok(()) + } + + fn check_row_id_range_conflicts( + &self, + commit_kind: &CommitKind, + check_from_snapshot: Option, + commit_entries: &[ManifestEntry], + ) -> Result<()> { + if check_from_snapshot.is_none() && commit_kind != &CommitKind::COMPACT { + return Ok(()); + } + + let entries = commit_entries + .iter() + .filter(|entry| { + entry.file().first_row_id.is_some() && !is_dedicated_storage_file(entry.file()) + }) + .collect::>(); + for (idx, left) in entries.iter().enumerate() { + let Some((left_start, left_end)) = left.file().row_id_range() else { + continue; + }; + for right in entries.iter().skip(idx + 1) { + let Some((right_start, right_end)) = right.file().row_id_range() else { + continue; + }; + if ranges_overlap(left_start, left_end, right_start, right_end) + && (left_start, left_end) != (right_start, right_end) + { + return Err(crate::Error::DataInvalid { + message: format!( + "For Data Evolution table, multiple operations have row-id range conflicts: {} [{}, {}] and {} [{}, {}].", + left.file().file_name, + left_start, + left_end, + right.file().file_name, + right_start, + right_end, + ), + source: None, + }); + } + } + } + Ok(()) + } + + async fn check_row_id_from_snapshot( + &self, + latest_snapshot: Option<&Snapshot>, + delta_entries: &[ManifestEntry], + check_from_snapshot: Option, + ) -> Result<()> { + let Some(check_from_snapshot) = check_from_snapshot else { + return Ok(()); + }; + let Some(latest_snapshot) = latest_snapshot else { + return Ok(()); + }; + + let source_snapshot = self + .snapshot_manager + .get_snapshot(check_from_snapshot) + .await?; + let check_next_row_id = + source_snapshot + .next_row_id() + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "Next row id cannot be null for snapshot {check_from_snapshot}." + ), + source: None, + })?; + + let write_ranges = self.build_row_id_write_ranges(delta_entries).await?; + if write_ranges.is_empty() { + return Ok(()); + } + + let delta_entry_refs = delta_entries.iter().collect::>(); + let partition_filter = self.build_entries_partition_filter(&delta_entry_refs)?; + for snapshot_id in check_from_snapshot + 1..=latest_snapshot.id() { + let snapshot = self.snapshot_manager.get_snapshot(snapshot_id).await?; + if snapshot.commit_kind() == &CommitKind::COMPACT { + continue; + } + for entry in self + .read_delta_entries(partition_filter.as_ref(), &snapshot) + .await? + .into_iter() + .filter(|entry| *entry.kind() == FileKind::Add) + { + let Some((start, end)) = entry.file().row_id_range() else { + continue; + }; + if start >= check_next_row_id { + continue; + } + let committed_field_ids = self.write_field_ids(entry.file()).await?; + if write_ranges.iter().any(|range| { + ranges_overlap(range.start, range.end, start, end) + && range + .field_ids + .iter() + .any(|field_id| committed_field_ids.contains(field_id)) + }) { + return Err(crate::Error::DataInvalid { + message: "For Data Evolution table, multiple MERGE INTO operations have encountered conflicts, updating the same file, which can render some updates ineffective.".to_string(), + source: None, + }); + } + } + } + Ok(()) + } + + async fn build_row_id_write_ranges( + &self, + delta_entries: &[ManifestEntry], + ) -> Result> { + let mut ranges = Vec::new(); + for entry in delta_entries + .iter() + .filter(|entry| *entry.kind() == FileKind::Add) + { + let Some((start, end)) = entry.file().row_id_range() else { + continue; + }; + let field_ids = self.write_field_ids(entry.file()).await?; + if !field_ids.is_empty() { + ranges.push(RowIdWriteRange { + start, + end, + field_ids, + }); + } + } + Ok(ranges) + } + + async fn write_field_ids(&self, file: &DataFileMeta) -> Result> { + let fields = if file.schema_id == self.table.schema().id() { + self.table.schema().fields().to_vec() + } else { + self.table + .schema_manager() + .schema(file.schema_id) + .await? + .fields() + .to_vec() + }; + let field_id_by_name = fields + .iter() + .map(|field| (field.name().to_string(), field.id())) + .collect::>(); + + let mut field_ids = HashSet::new(); + match file.write_cols.as_ref() { + None => { + field_ids.extend( + fields + .iter() + .filter(|field| !is_system_field(field.name())) + .map(|field| field.id()), + ); + } + Some(write_cols) => { + for col in write_cols { + if is_system_field(col) { + continue; + } + let Some(field_id) = field_id_by_name.get(col) else { + return Err(crate::Error::DataInvalid { + message: format!( + "Cannot find write column '{}' in schema {}.", + col, file.schema_id + ), + source: None, + }); + }; + field_ids.insert(*field_id); + } + } + } + Ok(field_ids) + } + + /// Assign row tracking metadata: snapshot ID as sequence number, and + /// first_row_id for new APPEND files that don't already have one. + /// Normal files advance the main counter. Blob files (identified by file name) + /// use per-column counters starting from the same base, since each blob column + /// rolls independently. + fn assign_row_tracking_meta( + &self, + snapshot_id: i64, + first_row_id_start: i64, + entries: Vec, + ) -> Result<(Vec, i64)> { + let mut result = Vec::with_capacity(entries.len()); + let mut start = first_row_id_start; + let mut blob_start_default = first_row_id_start; + let mut blob_starts: HashMap = HashMap::new(); + let mut vector_store_start = first_row_id_start; + + for entry in entries { + let mut entry = entry.with_sequence_number(snapshot_id, snapshot_id); + if entry.file().file_source.is_none() { + return Err(crate::Error::DataInvalid { + message: format!( + "file_source must be present for row-tracking table, file={}", + entry.file().file_name + ), + source: None, + }); + } + let contains_row_id = + entry.file().write_cols.as_ref().is_some_and(|cols| { + cols.iter().any(|col| col == crate::spec::ROW_ID_FIELD_NAME) + }); + if *entry.kind() == FileKind::Add + && entry.file().file_source == Some(0) // APPEND + && entry.file().first_row_id.is_none() + && !contains_row_id + { + if is_blob_data_file(entry.file()) { + let blob_field_name = entry + .file() + .write_cols + .as_ref() + .and_then(|cols| cols.first()) + .cloned() + .ok_or_else(|| crate::Error::DataInvalid { + message: format!( + "Blob file '{}' must have write_cols for row-tracking assignment.", + entry.file().file_name + ), + source: None, + })?; + let blob_start = blob_starts + .entry(blob_field_name) + .or_insert(blob_start_default); + if *blob_start >= start { + return Err(crate::Error::DataInvalid { + message: format!( + "This is a bug, blobStart {} should be less than start {} when assigning a blob entry file.", + *blob_start, start + ), + source: None, + }); + } + entry = entry.with_first_row_id(*blob_start); + *blob_start += entry.file().row_count; + } else if is_vector_store_file(entry.file()) { + if vector_store_start >= start { + return Err(crate::Error::DataInvalid { + message: format!( + "This is a bug, vectorStoreStart {} should be less than start {} when assigning a vector-store entry file.", + vector_store_start, start + ), + source: None, + }); + } + entry = entry.with_first_row_id(vector_store_start); + vector_store_start += entry.file().row_count; + } else { + entry = entry.with_first_row_id(start); + blob_start_default = start; + blob_starts.clear(); + start += entry.file().row_count; + } + } + result.push(entry); + } + + Ok((result, start)) + } + + /// Validate that files with pre-assigned `first_row_id` (e.g. partial-column + /// files from MERGE INTO) still match existing files in the current snapshot. + /// + /// When MERGE INTO and COMPACT run concurrently, compaction may rewrite the + /// original files that partial-column files reference. If the original file's + /// row ID range no longer exists, the partial-column files become invalid and + /// the commit must be rejected. + async fn validate_row_id_alignment( + &self, + commit_entries: &[ManifestEntry], + latest_snapshot: &Option, + ) -> Result<()> { + // Collect files that already have first_row_id assigned (pre-set by writer). + let files_to_check: Vec<_> = commit_entries + .iter() + .filter(|e| *e.kind() == FileKind::Add && e.file().first_row_id.is_some()) + .collect(); + + if files_to_check.is_empty() { + return Ok(()); + } + + let snap = match latest_snapshot { + Some(s) => s, + None => { + // No existing snapshot means no existing files — any pre-assigned + // first_row_id cannot match anything. + let entry = &files_to_check[0]; + return Err(crate::Error::DataInvalid { + message: format!( + "Row ID conflict: file '{}' has pre-assigned first_row_id={} \ + but no snapshot exists. The referenced files may have been removed \ + by a concurrent compaction.", + entry.file().file_name, + entry.file().first_row_id.unwrap(), + ), + source: None, + }); + } + }; + + // Read current files from the latest snapshot, filtered by partitions. + let partition_filter = self.build_entries_partition_filter(&files_to_check)?; + let scan = TableScan::new(&self.table, partition_filter, vec![], None, None, None) + .with_scan_all_files(); + let existing_entries = scan.plan_manifest_entries(snap).await?; + + // Build index: (partition, bucket, first_row_id, row_count) + let existing_index: HashSet<(&[u8], i32, i64, i64)> = existing_entries + .iter() + .filter_map(|e| { + e.file() + .first_row_id + .map(|fid| (e.partition(), e.bucket(), fid, e.file().row_count)) + }) + .collect(); + + for entry in &files_to_check { + let fid = entry.file().first_row_id.unwrap(); + let key = ( + entry.partition(), + entry.bucket(), + fid, + entry.file().row_count, + ); + if !existing_index.contains(&key) { + return Err(crate::Error::DataInvalid { message: format!( "Row ID conflict: file '{}' references first_row_id={}, row_count={} \ in partition/bucket ({}, {}), but no matching file exists in the \ @@ -1213,6 +2387,14 @@ impl TableCommit { Ok(spec) } + /// Earliest source snapshot requested by row-id conflict checks. + fn min_check_from_snapshot(messages: &[CommitMessage]) -> Option { + messages + .iter() + .filter_map(|message| message.check_from_snapshot) + .min() + } + /// Convert commit messages to manifest entries (ADD/DELETE kind). fn messages_to_entries(&self, messages: &[CommitMessage]) -> Vec { messages @@ -1267,7 +2449,8 @@ impl TableCommit { messages .iter() .flat_map(|msg| { - msg.new_index_files + let adds = msg + .new_index_files .iter() .map(move |index_file| IndexManifestEntry { kind: FileKind::Add, @@ -1275,7 +2458,18 @@ impl TableCommit { bucket: msg.bucket, index_file: index_file.clone(), version: 1, - }) + }); + let deletes = + msg.deleted_index_files + .iter() + .map(move |index_file| IndexManifestEntry { + kind: FileKind::Delete, + partition: msg.partition.clone(), + bucket: msg.bucket, + index_file: index_file.clone(), + version: 1, + }); + adds.chain(deletes) }) .collect() } @@ -1303,15 +2497,38 @@ enum CommitEntriesPlan { entries: Vec, changelog_entries: Vec, new_index_entries: Vec, + check_from_snapshot: Option, }, /// Overwrite with optional partition filter. Overwrite { partition_filter: Option, new_entries: Vec, new_index_entries: Vec, + cached_snapshot: Option>, + cached_entries: Vec, + full_scan_count: usize, + delta_probe_count: usize, }, } +impl CommitEntriesPlan { + fn commit_kind_hint(&self) -> CommitKind { + match self { + CommitEntriesPlan::Direct { entries, .. } => { + if entries + .iter() + .any(|entry| *entry.kind() == FileKind::Delete) + { + CommitKind::OVERWRITE + } else { + CommitKind::APPEND + } + } + CommitEntriesPlan::Overwrite { .. } => CommitKind::OVERWRITE, + } + } +} + /// Fully resolved commit ready for writing. struct ResolvedCommit { entries: Vec, @@ -1319,12 +2536,67 @@ struct ResolvedCommit { kind: CommitKind, index_manifest_name: Option, index_manifest_changed: bool, + base_data_files: Option>, +} + +enum CommitAttemptResult { + Success, + Retry(Box), +} + +struct RetryState { + latest_snapshot: Option, + base_data_files: Option>, +} + +struct RowIdWriteRange { + start: i64, + end: i64, + field_ids: HashSet, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum FileStorageKind { + Normal, + Blob, + Vector, } fn ranges_overlap(left_start: i64, left_end: i64, right_start: i64, right_end: i64) -> bool { left_start <= right_end && right_start <= left_end } +fn is_blob_data_file(file: &DataFileMeta) -> bool { + crate::table::blob_file_writer::is_blob_file_name(&file.file_name) +} + +fn is_vector_store_file(file: &DataFileMeta) -> bool { + file.file_name.contains(".vector.") +} + +fn is_dedicated_storage_file(file: &DataFileMeta) -> bool { + !matches!(file_storage_kind(file), FileStorageKind::Normal) +} + +fn file_storage_kind(file: &DataFileMeta) -> FileStorageKind { + if is_blob_data_file(file) { + FileStorageKind::Blob + } else if is_vector_store_file(file) { + FileStorageKind::Vector + } else { + FileStorageKind::Normal + } +} + +fn is_system_field(name: &str) -> bool { + matches!( + name, + crate::spec::ROW_ID_FIELD_NAME + | crate::spec::SEQUENCE_NUMBER_FIELD_NAME + | crate::spec::VALUE_KIND_FIELD_NAME + ) +} + fn global_index_overlap_error( retained: &IndexManifestEntry, retained_meta: &crate::spec::GlobalIndexMeta, @@ -1349,6 +2621,39 @@ fn global_index_overlap_error( } } +fn normalize_index_entries(entries: Vec) -> Vec { + let mut active = Vec::new(); + for entry in entries { + match entry.kind { + FileKind::Add => { + active.retain(|current| !same_index_file_entry(current, &entry)); + active.push(entry); + } + FileKind::Delete => { + active.retain(|current| !same_index_file_entry(current, &entry)); + } + } + } + active +} + +fn same_index_file_entry(left: &IndexManifestEntry, right: &IndexManifestEntry) -> bool { + same_index_partition(&left.partition, &right.partition) + && left.bucket == right.bucket + && left.index_file.index_type == right.index_file.index_type + && left.index_file.file_name == right.index_file.file_name +} + +fn same_index_partition(left: &[u8], right: &[u8]) -> bool { + left == right || (is_empty_partition(left) && is_empty_partition(right)) +} + +fn is_empty_partition(partition: &[u8]) -> bool { + partition.is_empty() + || partition == EMPTY_SERIALIZED_ROW.as_slice() + || partition == [0, 0, 0, 0] +} + fn validate_expected_latest_snapshot( expected_snapshot_id: Option, latest_snapshot: &Option, @@ -1449,6 +2754,20 @@ mod tests { ) } + fn test_table_with_options( + file_io: &FileIO, + table_path: &str, + options: HashMap, + ) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_schema().copy_with_options(options), + None, + ) + } + fn test_data_file(name: &str, row_count: i64) -> DataFileMeta { DataFileMeta { file_name: name.to_string(), @@ -1500,6 +2819,21 @@ mod tests { } } + fn test_global_index_file_with_extra_fields( + name: &str, + index_field_id: i32, + extra_field_ids: Vec, + row_range_start: i64, + row_range_end: i64, + ) -> IndexFileMeta { + let mut file = test_global_index_file(name, index_field_id, row_range_start, row_range_end); + file.global_index_meta + .as_mut() + .expect("global index meta") + .extra_field_ids = Some(extra_field_ids); + file + } + fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit { let table = test_table(file_io, table_path); TableCommit::new(table, "test-user".to_string()) @@ -1510,6 +2844,70 @@ mod tests { TableCommit::new(table, "test-user".to_string()) } + fn partition_filter_for(commit: &TableCommit, partitions: Vec>) -> PartitionFilter { + PartitionFilter::from_partition_set( + partitions.into_iter().collect(), + &commit.table.schema().partition_fields(), + ) + .unwrap() + } + + fn overwrite_plan_counts(plan: &CommitEntriesPlan) -> (usize, usize) { + match plan { + CommitEntriesPlan::Overwrite { + full_scan_count, + delta_probe_count, + .. + } => (*full_scan_count, *delta_probe_count), + CommitEntriesPlan::Direct { .. } => unreachable!(), + } + } + + fn overwrite_plan( + partition_filter: Option, + new_entries: Vec, + ) -> CommitEntriesPlan { + CommitEntriesPlan::Overwrite { + partition_filter, + new_entries, + new_index_entries: vec![], + cached_snapshot: None, + cached_entries: Vec::new(), + full_scan_count: 0, + delta_probe_count: 0, + } + } + + async fn latest_snapshot(file_io: &FileIO, table_path: &str) -> Option { + SnapshotManager::new(file_io.clone(), table_path.to_string()) + .get_latest_snapshot() + .await + .unwrap() + } + + async fn active_entries( + file_io: &FileIO, + table_path: &str, + snapshot: &Snapshot, + ) -> Vec { + let manifest_dir = format!("{table_path}/manifest"); + let mut entries = Vec::new(); + for list in [ + snapshot.base_manifest_list(), + snapshot.delta_manifest_list(), + ] { + let list_path = format!("{manifest_dir}/{list}"); + for meta in ManifestList::read(file_io, &list_path).await.unwrap() { + entries.extend( + Manifest::read(file_io, &format!("{manifest_dir}/{}", meta.file_name())) + .await + .unwrap(), + ); + } + } + merge_active_entries(entries) + } + fn partition_bytes(pt: &str) -> Vec { let mut builder = BinaryRowBuilder::new(1); if pt.len() <= 7 { @@ -1570,6 +2968,63 @@ mod tests { assert_eq!(entries[0].file().file_name, "data-0.parquet"); } + #[tokio::test] + async fn test_commit_with_identifier_writes_snapshot_identifier() { + let file_io = test_file_io(); + let table_path = "memory:/test_commit_with_identifier"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + commit + .commit_with_identifier( + vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-0.parquet", 100)], + )], + 42, + ) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.commit_identifier(), 42); + } + + #[tokio::test] + async fn test_duplicate_commit_requires_same_identifier() { + let file_io = test_file_io(); + let table_path = "memory:/test_duplicate_commit_identifier"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + commit + .commit_with_identifier( + vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-0.parquet", 100)], + )], + 7, + ) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let latest = snap_manager.get_latest_snapshot().await.unwrap(); + assert!( + commit + .is_duplicate_commit(1, &latest, 7, &CommitKind::APPEND) + .await + ); + assert!( + !commit + .is_duplicate_commit(1, &latest, 8, &CommitKind::APPEND) + .await + ); + } + #[tokio::test] async fn test_multiple_appends() { let file_io = test_file_io(); @@ -1759,9 +3214,34 @@ mod tests { } #[tokio::test] - async fn test_append_data_invalidates_previous_global_index() { + async fn test_index_delete_removes_previous_index_manifest_entry() { let file_io = test_file_io(); - let table_path = "memory:/test_append_data_invalidates_previous_global_index"; + let table_path = "memory:/test_index_delete"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let index_file = test_global_index_file("lumina-0.index", 0, 0, 9); + let mut first = CommitMessage::new(vec![], 0, vec![]); + first.new_index_files = vec![index_file.clone()]; + commit.commit(vec![first]).await.unwrap(); + + let mut second = CommitMessage::new(vec![], 0, vec![]); + second.deleted_index_files = vec![index_file]; + commit + .commit_with_identifier(vec![second], 2) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert!(snapshot.index_manifest().is_none()); + } + + #[tokio::test] + async fn test_append_data_preserves_previous_global_index() { + let file_io = test_file_io(); + let table_path = "memory:/test_append_data_preserves_previous_global_index"; setup_dirs(&file_io, table_path).await; let commit = setup_commit(&file_io, table_path); @@ -1781,9 +3261,171 @@ mod tests { let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); assert_eq!(snapshot.id(), 2); + assert!(snapshot.index_manifest().is_some()); + let index_manifest = snapshot.index_manifest().expect("index manifest"); + let index_entries = + IndexManifest::read(&file_io, &format!("{table_path}/manifest/{index_manifest}")) + .await + .unwrap(); + assert_eq!(index_entries.len(), 1); + assert_eq!(index_entries[0].index_file.file_name, "lumina-0.index"); + } + + #[tokio::test] + async fn test_partial_update_indexed_column_rejects_by_default() { + let file_io = test_file_io(); + let table_path = "memory:/test_partial_update_indexed_column_rejects"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let mut first = CommitMessage::new(vec![], 0, vec![]); + first.new_index_files = vec![test_global_index_file("lumina-0.index", 0, 0, 9)]; + commit.commit(vec![first]).await.unwrap(); + + let mut data_file = test_data_file("data-update-id.parquet", 10); + data_file.write_cols = Some(vec!["id".to_string()]); + let result = commit + .commit(vec![CommitMessage::new(vec![], 0, vec![data_file])]) + .await; + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("globally indexed columns"), + "expected global index update error, got: {err_msg}" + ); + } + + #[tokio::test] + async fn test_partial_update_indexed_column_drops_partition_index_when_configured() { + let file_io = test_file_io(); + let table_path = "memory:/test_partial_update_indexed_column_drops_index"; + setup_dirs(&file_io, table_path).await; + + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_schema().copy_with_options(HashMap::from([( + "global-index.column-update-action".to_string(), + "DROP_PARTITION_INDEX".to_string(), + )])), + None, + ); + let commit = TableCommit::new(table, "test-user".to_string()); + + let mut first = CommitMessage::new(vec![], 0, vec![]); + first.new_index_files = vec![test_global_index_file("lumina-0.index", 0, 0, 9)]; + commit.commit(vec![first]).await.unwrap(); + + let mut data_file = test_data_file("data-update-id.parquet", 10); + data_file.write_cols = Some(vec!["id".to_string()]); + commit + .commit(vec![CommitMessage::new(vec![], 0, vec![data_file])]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert!(snapshot.index_manifest().is_none()); + } + + #[tokio::test] + async fn test_partial_update_extra_indexed_column_rejects_by_default() { + let file_io = test_file_io(); + let table_path = "memory:/test_partial_update_extra_indexed_column_rejects"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let mut first = CommitMessage::new(vec![], 0, vec![]); + first.new_index_files = vec![test_global_index_file_with_extra_fields( + "lumina-id-name.index", + 0, + vec![1], + 0, + 9, + )]; + commit.commit(vec![first]).await.unwrap(); + + let mut data_file = test_data_file("data-update-name.parquet", 10); + data_file.write_cols = Some(vec!["name".to_string()]); + let result = commit + .commit(vec![CommitMessage::new(vec![], 0, vec![data_file])]) + .await; + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("Conflicted columns") + && err_msg.contains("name") + && err_msg.contains("globally indexed columns"), + "expected extra-field global index update error, got: {err_msg}" + ); + } + + #[tokio::test] + async fn test_partial_update_extra_indexed_column_drops_partition_index_when_configured() { + let file_io = test_file_io(); + let table_path = "memory:/test_partial_update_extra_indexed_column_drops_index"; + setup_dirs(&file_io, table_path).await; + + let table = test_table_with_options( + &file_io, + table_path, + HashMap::from([( + "global-index.column-update-action".to_string(), + "DROP_PARTITION_INDEX".to_string(), + )]), + ); + let commit = TableCommit::new(table, "test-user".to_string()); + + let mut first = CommitMessage::new(vec![], 0, vec![]); + first.new_index_files = vec![test_global_index_file_with_extra_fields( + "lumina-id-name.index", + 0, + vec![1], + 0, + 9, + )]; + commit.commit(vec![first]).await.unwrap(); + + let mut data_file = test_data_file("data-update-name.parquet", 10); + data_file.write_cols = Some(vec!["name".to_string()]); + commit + .commit(vec![CommitMessage::new(vec![], 0, vec![data_file])]) + .await + .unwrap(); + + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + assert_eq!(snapshot.id(), 2); assert!(snapshot.index_manifest().is_none()); } + #[tokio::test] + async fn test_partial_update_non_indexed_column_preserves_global_index() { + let file_io = test_file_io(); + let table_path = "memory:/test_partial_update_non_indexed_column_preserves_index"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let mut first = CommitMessage::new(vec![], 0, vec![]); + first.new_index_files = vec![test_global_index_file("lumina-0.index", 0, 0, 9)]; + commit.commit(vec![first]).await.unwrap(); + + let mut data_file = test_data_file("data-update-name.parquet", 10); + data_file.write_cols = Some(vec!["name".to_string()]); + commit + .commit(vec![CommitMessage::new(vec![], 0, vec![data_file])]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert!(snapshot.index_manifest().is_some()); + } + #[tokio::test] async fn test_truncate_table() { let file_io = test_file_io(); @@ -1859,6 +3501,255 @@ mod tests { assert_eq!(snapshot.total_record_count(), Some(250)); } + #[tokio::test] + async fn test_overwrite_cache_reuses_when_append_misses_target_partition() { + let file_io = test_file_io(); + let table_path = "memory:/test_overwrite_cache_reuse"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + commit + .commit(vec![ + CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + ), + CommitMessage::new( + partition_bytes("b"), + 0, + vec![test_data_file("data-b.parquet", 200)], + ), + ]) + .await + .unwrap(); + + let new_entries = commit.messages_to_entries(&[CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a2.parquet", 50)], + )]); + let mut plan = overwrite_plan( + Some(partition_filter_for(&commit, vec![partition_bytes("a")])), + new_entries, + ); + + let snapshot1 = latest_snapshot(&file_io, table_path).await; + let first = commit + .provide_overwrite_entries(&mut plan, &snapshot1) + .await + .unwrap(); + assert_eq!(overwrite_plan_counts(&plan), (1, 0)); + assert!(first.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == "data-a.parquet" + })); + + commit + .commit(vec![CommitMessage::new( + partition_bytes("z"), + 0, + vec![test_data_file("data-z.parquet", 10)], + )]) + .await + .unwrap(); + + let snapshot2 = latest_snapshot(&file_io, table_path).await; + let second = commit + .provide_overwrite_entries(&mut plan, &snapshot2) + .await + .unwrap(); + assert_eq!( + overwrite_plan_counts(&plan), + (1, 1), + "unrelated APPEND should reuse the cached target-partition scan" + ); + assert!(second.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == "data-a.parquet" + })); + assert!(!second + .iter() + .any(|entry| entry.file().file_name == "data-z.parquet")); + } + + #[tokio::test] + async fn test_overwrite_cache_rebuilds_when_append_hits_target_partition() { + let file_io = test_file_io(); + let table_path = "memory:/test_overwrite_cache_rebuild_target_append"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + commit + .commit(vec![CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + )]) + .await + .unwrap(); + + let new_entries = commit.messages_to_entries(&[CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a2.parquet", 50)], + )]); + let mut plan = overwrite_plan( + Some(partition_filter_for(&commit, vec![partition_bytes("a")])), + new_entries, + ); + + let snapshot1 = latest_snapshot(&file_io, table_path).await; + commit + .provide_overwrite_entries(&mut plan, &snapshot1) + .await + .unwrap(); + + commit + .commit(vec![CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a3.parquet", 10)], + )]) + .await + .unwrap(); + + let snapshot2 = latest_snapshot(&file_io, table_path).await; + let second = commit + .provide_overwrite_entries(&mut plan, &snapshot2) + .await + .unwrap(); + assert_eq!( + overwrite_plan_counts(&plan), + (2, 1), + "target-partition APPEND must force a full scan rebuild" + ); + let deleted = second + .iter() + .filter(|entry| *entry.kind() == FileKind::Delete) + .map(|entry| entry.file().file_name.as_str()) + .collect::>(); + assert!(deleted.contains("data-a.parquet")); + assert!( + deleted.contains("data-a3.parquet"), + "rebuilt overwrite scan must delete the concurrent target append too" + ); + } + + #[tokio::test] + async fn test_overwrite_cache_rebuilds_on_non_append_snapshot() { + let file_io = test_file_io(); + let table_path = "memory:/test_overwrite_cache_rebuild_non_append"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + commit + .commit(vec![CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + )]) + .await + .unwrap(); + + let new_entries = commit.messages_to_entries(&[CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a2.parquet", 50)], + )]); + let mut plan = overwrite_plan( + Some(partition_filter_for(&commit, vec![partition_bytes("a")])), + new_entries, + ); + + let snapshot1 = latest_snapshot(&file_io, table_path).await; + commit + .provide_overwrite_entries(&mut plan, &snapshot1) + .await + .unwrap(); + + commit + .overwrite( + vec![CommitMessage::new( + partition_bytes("z"), + 0, + vec![test_data_file("data-z.parquet", 10)], + )], + None, + ) + .await + .unwrap(); + + let snapshot2 = latest_snapshot(&file_io, table_path).await; + commit + .provide_overwrite_entries(&mut plan, &snapshot2) + .await + .unwrap(); + assert_eq!( + overwrite_plan_counts(&plan), + (2, 1), + "non-APPEND snapshots between retries cannot reuse overwrite cache" + ); + } + + #[tokio::test] + async fn test_whole_table_overwrite_never_uses_delta_probe_cache() { + let file_io = test_file_io(); + let table_path = "memory:/test_whole_table_overwrite_cache"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + commit + .commit(vec![ + CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + ), + CommitMessage::new( + partition_bytes("b"), + 0, + vec![test_data_file("data-b.parquet", 200)], + ), + ]) + .await + .unwrap(); + + let new_entries = commit.messages_to_entries(&[CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a2.parquet", 50)], + )]); + let mut plan = overwrite_plan(None, new_entries); + + let snapshot1 = latest_snapshot(&file_io, table_path).await; + commit + .provide_overwrite_entries(&mut plan, &snapshot1) + .await + .unwrap(); + + commit + .commit(vec![CommitMessage::new( + partition_bytes("z"), + 0, + vec![test_data_file("data-z.parquet", 10)], + )]) + .await + .unwrap(); + + let snapshot2 = latest_snapshot(&file_io, table_path).await; + let second = commit + .provide_overwrite_entries(&mut plan, &snapshot2) + .await + .unwrap(); + assert_eq!( + overwrite_plan_counts(&plan), + (2, 0), + "whole-table overwrite has no target predicate, so it must full-scan each retry" + ); + assert!(second.iter().any(|entry| { + *entry.kind() == FileKind::Delete && entry.file().file_name == "data-z.parquet" + })); + } + #[tokio::test] async fn test_dynamic_overwrite_ignores_changelog_only_message() { let file_io = test_file_io(); @@ -1933,6 +3824,51 @@ mod tests { assert_eq!(snapshot.total_record_count(), Some(200)); } + #[tokio::test] + async fn test_drop_partitions_empty_list_rejected() { + let file_io = test_file_io(); + let table_path = "memory:/test_drop_partitions_empty"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + let result = commit.drop_partitions(vec![]).await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Partitions list cannot be empty")); + } + + #[tokio::test] + async fn test_truncate_missing_partition_is_noop() { + let file_io = test_file_io(); + let table_path = "memory:/test_truncate_missing_partition"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + commit + .commit(vec![CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + )]) + .await + .unwrap(); + + commit + .truncate_partitions(vec![HashMap::from([( + "pt".to_string(), + Some(Datum::String("missing".to_string())), + )])]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.total_record_count(), Some(100)); + } + fn null_partition_bytes() -> Vec { let mut builder = BinaryRowBuilder::new(1); builder.set_null_at(0); @@ -1947,22 +3883,122 @@ mod tests { .option("row-tracking.enabled", "true") .build() .unwrap(); - TableSchema::new(0, &schema) - } + TableSchema::new(0, &schema) + } + + fn test_data_evolution_schema() -> TableSchema { + use crate::spec::{DataType, IntType, Schema, VarCharType}; + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .option("row-tracking.enabled", "true") + .option("data-evolution.enabled", "true") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_row_tracking_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_row_tracking_schema(), + None, + ) + } + + fn test_data_evolution_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_data_evolution_schema(), + None, + ) + } + + fn setup_row_tracking_commit(file_io: &FileIO, table_path: &str) -> TableCommit { + let table = test_row_tracking_table(file_io, table_path); + TableCommit::new(table, "test-user".to_string()) + } + + fn setup_data_evolution_commit(file_io: &FileIO, table_path: &str) -> TableCommit { + let table = test_data_evolution_table(file_io, table_path); + TableCommit::new(table, "test-user".to_string()) + } + + #[tokio::test] + async fn test_row_tracking_rejects_missing_file_source() { + let file_io = test_file_io(); + let table_path = "memory:/test_row_tracking_missing_file_source"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_row_tracking_commit(&file_io, table_path); + let file = test_data_file("data-0.parquet", 10); + + let result = commit + .commit(vec![CommitMessage::new(vec![], 0, vec![file])]) + .await; + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("file_source must be present"), + "expected file_source error, got: {err_msg}" + ); + } + + #[tokio::test] + async fn test_row_tracking_assigns_vector_store_files_from_current_data_start() { + let file_io = test_file_io(); + let table_path = "memory:/test_row_tracking_vector_store"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_row_tracking_commit(&file_io, table_path); + let mut data_file = test_data_file("data-0.parquet", 10); + data_file.file_source = Some(0); + let mut vector_file = test_data_file("data-0.vector.vortex", 10); + vector_file.file_source = Some(0); + vector_file.write_cols = Some(vec!["name".to_string()]); + + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![data_file, vector_file], + )]) + .await + .unwrap(); - fn test_row_tracking_table(file_io: &FileIO, table_path: &str) -> Table { - Table::new( - file_io.clone(), - Identifier::new("default", "test_table"), - table_path.to_string(), - test_row_tracking_schema(), - None, - ) - } + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.next_row_id(), Some(10)); - fn setup_row_tracking_commit(file_io: &FileIO, table_path: &str) -> TableCommit { - let table = test_row_tracking_table(file_io, table_path); - TableCommit::new(table, "test-user".to_string()) + let delta_metas = ManifestList::read( + &file_io, + &format!("{table_path}/manifest/{}", snapshot.delta_manifest_list()), + ) + .await + .unwrap(); + assert_eq!(delta_metas[0].min_row_id(), Some(0)); + assert_eq!(delta_metas[0].max_row_id(), Some(9)); + let entries = Manifest::read( + &file_io, + &format!("{table_path}/manifest/{}", delta_metas[0].file_name()), + ) + .await + .unwrap(); + let data = entries + .iter() + .find(|entry| entry.file().file_name == "data-0.parquet") + .unwrap(); + let vector = entries + .iter() + .find(|entry| entry.file().file_name == "data-0.vector.vortex") + .unwrap(); + assert_eq!(data.file().first_row_id, Some(0)); + assert_eq!(vector.file().first_row_id, Some(0)); } #[tokio::test] @@ -2057,6 +4093,97 @@ mod tests { assert_eq!(snapshot.id(), 2); } + #[tokio::test] + async fn test_check_from_snapshot_rejects_concurrent_same_column_update() { + let file_io = test_file_io(); + let table_path = "memory:/test_check_from_snapshot_same_column"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_data_evolution_commit(&file_io, table_path); + let partition = EMPTY_SERIALIZED_ROW.clone(); + let mut initial_file = test_data_file("data-0.parquet", 100); + initial_file.file_source = Some(0); + commit + .commit(vec![CommitMessage::new( + partition.clone(), + 0, + vec![initial_file], + )]) + .await + .unwrap(); + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + let entries = active_entries(&file_io, table_path, &snapshot).await; + assert_eq!(entries[0].file().first_row_id, Some(0)); + + let mut first_partial = test_data_file("partial-name-a.parquet", 100); + first_partial.first_row_id = Some(0); + first_partial.file_source = Some(0); + first_partial.write_cols = Some(vec!["name".to_string()]); + let mut first_message = CommitMessage::new(partition.clone(), 0, vec![first_partial]); + first_message.check_from_snapshot = Some(1); + commit.commit(vec![first_message]).await.unwrap(); + + let mut second_partial = test_data_file("partial-name-b.parquet", 100); + second_partial.first_row_id = Some(0); + second_partial.file_source = Some(0); + second_partial.write_cols = Some(vec!["name".to_string()]); + let mut second_message = CommitMessage::new(partition, 0, vec![second_partial]); + second_message.check_from_snapshot = Some(1); + + let result = commit.commit(vec![second_message]).await; + + assert!(result.is_err()); + let err_msg = result.unwrap_err().to_string(); + assert!( + err_msg.contains("multiple MERGE INTO operations have encountered conflicts"), + "expected row-id/column conflict, got: {err_msg}" + ); + } + + #[tokio::test] + async fn test_check_from_snapshot_allows_concurrent_different_column_update() { + let file_io = test_file_io(); + let table_path = "memory:/test_check_from_snapshot_different_column"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_data_evolution_commit(&file_io, table_path); + let partition = EMPTY_SERIALIZED_ROW.clone(); + let mut initial_file = test_data_file("data-0.parquet", 100); + initial_file.file_source = Some(0); + commit + .commit(vec![CommitMessage::new( + partition.clone(), + 0, + vec![initial_file], + )]) + .await + .unwrap(); + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + let entries = active_entries(&file_io, table_path, &snapshot).await; + assert_eq!(entries[0].file().first_row_id, Some(0)); + + let mut name_partial = test_data_file("partial-name.parquet", 100); + name_partial.first_row_id = Some(0); + name_partial.file_source = Some(0); + name_partial.write_cols = Some(vec!["name".to_string()]); + let mut name_message = CommitMessage::new(partition.clone(), 0, vec![name_partial]); + name_message.check_from_snapshot = Some(1); + commit.commit(vec![name_message]).await.unwrap(); + + let mut id_partial = test_data_file("partial-id.parquet", 100); + id_partial.first_row_id = Some(0); + id_partial.file_source = Some(0); + id_partial.write_cols = Some(vec!["id".to_string()]); + let mut id_message = CommitMessage::new(partition, 0, vec![id_partial]); + id_message.check_from_snapshot = Some(1); + + commit.commit(vec![id_message]).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 3); + } + #[tokio::test] async fn test_row_id_conflict_no_snapshot_rejects() { // Committing a file with pre-assigned first_row_id when no snapshot exists @@ -2139,6 +4266,89 @@ mod tests { assert_eq!(snapshot.total_record_count(), Some(350)); } + #[tokio::test] + async fn test_static_overwrite_default_partition_name_treated_as_null() { + let file_io = test_file_io(); + let table_path = "memory:/test_static_overwrite_default_partition"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + commit + .commit(vec![ + CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + ), + CommitMessage::new( + null_partition_bytes(), + 0, + vec![test_data_file("data-null.parquet", 300)], + ), + ]) + .await + .unwrap(); + + commit + .overwrite( + vec![CommitMessage::new( + null_partition_bytes(), + 0, + vec![test_data_file("data-null2.parquet", 50)], + )], + Some(HashMap::from([( + "pt".to_string(), + Some(Datum::String("__DEFAULT_PARTITION__".to_string())), + )])), + ) + .await + .unwrap(); + + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + assert_eq!(snapshot.id(), 2); + assert_eq!(snapshot.total_record_count(), Some(150)); + let active_file_names = active_entries(&file_io, table_path, &snapshot) + .await + .into_iter() + .map(|entry| entry.file().file_name.clone()) + .collect::>(); + assert_eq!( + active_file_names, + HashSet::from([ + "data-a.parquet".to_string(), + "data-null2.parquet".to_string() + ]) + ); + } + + #[tokio::test] + async fn test_static_overwrite_rejects_mismatched_message_partition() { + let file_io = test_file_io(); + let table_path = "memory:/test_static_overwrite_mismatch"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + let result = commit + .overwrite( + vec![CommitMessage::new( + partition_bytes("b"), + 0, + vec![test_data_file("data-b.parquet", 100)], + )], + Some(HashMap::from([( + "pt".to_string(), + Some(Datum::String("a".to_string())), + )])), + ) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("does not belong to this partition")); + } + #[tokio::test] async fn test_overwrite_ignores_changelog_files() { let file_io = test_file_io(); @@ -2159,6 +4369,64 @@ mod tests { assert_eq!(snapshot.changelog_manifest_list(), None); } + #[tokio::test] + async fn test_commit_writes_changelog_manifest_list_size() { + let file_io = test_file_io(); + let table_path = "memory:/test_changelog_manifest_list_size"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let mut message = CommitMessage::new(vec![], 0, vec![test_data_file("data.parquet", 10)]); + message.new_changelog_files = vec![test_data_file("changelog.parquet", 3)]; + commit.commit(vec![message]).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.changelog_record_count(), Some(3)); + assert!(snapshot.changelog_manifest_list().is_some()); + assert!(snapshot.changelog_manifest_list_size().unwrap() > 0); + } + + #[tokio::test] + async fn test_abort_deletes_new_data_and_changelog_files() { + let file_io = test_file_io(); + let table_path = "memory:/test_abort_cleanup"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + let bucket_dir = format!("{table_path}/bucket-0"); + file_io.mkdirs(&format!("{bucket_dir}/")).await.unwrap(); + + let mut data_file = test_data_file("data.parquet", 10); + data_file.extra_files = vec!["data.parquet.index".to_string()]; + let changelog_file = test_data_file("changelog.parquet", 3); + for name in ["data.parquet", "data.parquet.index", "changelog.parquet"] { + file_io + .new_output(&format!("{bucket_dir}/{name}")) + .unwrap() + .write(bytes::Bytes::from_static(b"x")) + .await + .unwrap(); + } + + let mut message = CommitMessage::new(vec![], 0, vec![data_file]); + message.new_changelog_files = vec![changelog_file]; + commit.abort(&[message]).await.unwrap(); + + assert!(!file_io + .exists(&format!("{bucket_dir}/data.parquet")) + .await + .unwrap()); + assert!(!file_io + .exists(&format!("{bucket_dir}/data.parquet.index")) + .await + .unwrap()); + assert!(!file_io + .exists(&format!("{bucket_dir}/changelog.parquet")) + .await + .unwrap()); + } + #[tokio::test] async fn test_delete_conflict_rejects_missing_file() { let file_io = test_file_io(); @@ -2315,6 +4583,215 @@ mod tests { assert_eq!(stats.null_counts(), &vec![Some(1)]); } + #[tokio::test] + async fn test_manifest_files_roll_by_target_size_and_preserve_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_manifest_rolling"; + setup_dirs(&file_io, table_path).await; + + let table = test_table_with_options( + &file_io, + table_path, + HashMap::from([("manifest.target-file-size".to_string(), "1 kb".to_string())]), + ); + let commit = TableCommit::new(table, "test-user".to_string()); + + let messages = (0..2500) + .map(|i| { + let mut file = test_data_file(&format!("data-{i:04}.parquet"), 1); + file.extra_files = (0..8).map(|j| format!("data-{i:04}-{j}.idx")).collect(); + CommitMessage::new(vec![], 0, vec![file]) + }) + .collect::>(); + commit.commit(messages).await.unwrap(); + + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + let manifest_dir = format!("{table_path}/manifest"); + let delta_manifest_list_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); + let delta_manifest_list_bytes = file_io + .new_input(&delta_manifest_list_path) + .unwrap() + .read() + .await + .unwrap(); + assert!( + contains_bytes(&delta_manifest_list_bytes, b"zstandard"), + "manifest lists should use the default zstd Avro codec" + ); + let delta_metas = ManifestList::read(&file_io, &delta_manifest_list_path) + .await + .unwrap(); + assert!( + delta_metas.len() > 1, + "small manifest target should roll into multiple manifest files" + ); + assert_eq!( + delta_metas + .iter() + .map(|meta| meta.num_added_files() + meta.num_deleted_files()) + .sum::(), + 2500 + ); + for meta in &delta_metas[..delta_metas.len() - 1] { + assert!( + meta.file_size() >= 1024, + "rolled manifest files should not be smaller than target" + ); + } + + let mut file_names = HashSet::new(); + for meta in &delta_metas { + let manifest_path = format!("{manifest_dir}/{}", meta.file_name()); + let manifest_bytes = file_io + .new_input(&manifest_path) + .unwrap() + .read() + .await + .unwrap(); + assert!( + contains_bytes(&manifest_bytes, b"zstandard"), + "manifest files should use the default zstd Avro codec" + ); + assert_eq!( + file_io.get_status(&manifest_path).await.unwrap().size as i64, + meta.file_size() + ); + let entries = Manifest::read(&file_io, &manifest_path).await.unwrap(); + assert_eq!( + entries.len() as i64, + meta.num_added_files() + meta.num_deleted_files() + ); + for entry in entries { + file_names.insert(entry.file().file_name.clone()); + } + } + assert_eq!(file_names.len(), 2500); + assert!(file_names.contains("data-0000.parquet")); + assert!(file_names.contains("data-2499.parquet")); + } + + #[tokio::test] + async fn test_manifest_rolling_waits_for_java_check_cadence() { + let file_io = test_file_io(); + let table_path = "memory:/test_manifest_rolling_cadence"; + setup_dirs(&file_io, table_path).await; + + let table = test_table_with_options( + &file_io, + table_path, + HashMap::from([("manifest.target-file-size".to_string(), "1 kb".to_string())]), + ); + let commit = TableCommit::new(table, "test-user".to_string()); + + let messages = (0..80) + .map(|i| { + let mut file = test_data_file(&format!("data-{i:03}.parquet"), 1); + file.extra_files = (0..8).map(|j| format!("data-{i:03}-{j}.idx")).collect(); + CommitMessage::new(vec![], 0, vec![file]) + }) + .collect::>(); + commit.commit(messages).await.unwrap(); + + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + let manifest_dir = format!("{table_path}/manifest"); + let delta_metas = ManifestList::read( + &file_io, + &format!("{manifest_dir}/{}", snapshot.delta_manifest_list()), + ) + .await + .unwrap(); + assert_eq!( + delta_metas.len(), + 1, + "manifest rolling should not check before Java's 1000-record cadence" + ); + } + + fn contains_bytes(haystack: &[u8], needle: &[u8]) -> bool { + haystack + .windows(needle.len()) + .any(|window| window == needle) + } + + #[tokio::test] + async fn test_minor_compaction_nets_add_delete_manifest_entries() { + let file_io = test_file_io(); + let table_path = "memory:/test_minor_manifest_compaction"; + setup_dirs(&file_io, table_path).await; + + let table = test_table_with_options( + &file_io, + table_path, + HashMap::from([("manifest.merge-min-count".to_string(), "2".to_string())]), + ); + let commit = TableCommit::new(table, "test-user".to_string()); + + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-0.parquet", 100)], + )]) + .await + .unwrap(); + + commit + .overwrite( + vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-1.parquet", 50)], + )], + None, + ) + .await + .unwrap(); + + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-2.parquet", 25)], + )]) + .await + .unwrap(); + + let snapshot = latest_snapshot(&file_io, table_path).await.unwrap(); + assert_eq!(snapshot.id(), 3); + let manifest_dir = format!("{table_path}/manifest"); + let base_metas = ManifestList::read( + &file_io, + &format!("{manifest_dir}/{}", snapshot.base_manifest_list()), + ) + .await + .unwrap(); + assert_eq!( + base_metas.len(), + 1, + "two previous manifest files should be minor-compacted into one base manifest" + ); + + let base_entries = Manifest::read( + &file_io, + &format!("{manifest_dir}/{}", base_metas[0].file_name()), + ) + .await + .unwrap(); + assert_eq!(base_entries.len(), 1); + assert_eq!(*base_entries[0].kind(), FileKind::Add); + assert_eq!(base_entries[0].file().file_name, "data-1.parquet"); + + let active_file_names = active_entries(&file_io, table_path, &snapshot) + .await + .into_iter() + .map(|entry| entry.file().file_name.clone()) + .collect::>(); + assert_eq!( + active_file_names, + HashSet::from(["data-1.parquet".to_string(), "data-2.parquet".to_string()]) + ); + } + /// `write_manifest_file` must aggregate min/max bucket and level across entries so the /// Java reader can prune manifests by bucket / level (see apache/paimon#5345). This /// drives a real commit so all the call-site plumbing is exercised end to end.