Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled";
const DATA_EVOLUTION_ENABLED_OPTION: &str = "data-evolution.enabled";
const GLOBAL_INDEX_ENABLED_OPTION: &str = "global-index.enabled";
const GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION: &str = "global-index.row-count-per-shard";
const GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION: &str = "global-index.column-update-action";
const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size";
const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
Expand All @@ -46,6 +47,10 @@ const CHANGELOG_FILE_FORMAT_OPTION: &str = "changelog-file.format";
const CHANGELOG_FILE_COMPRESSION_OPTION: &str = "changelog-file.compression";
const CHANGELOG_FILE_STATS_MODE_OPTION: &str = "changelog-file.stats-mode";
const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled";
const MANIFEST_COMPRESSION_OPTION: &str = "manifest.compression";
const MANIFEST_TARGET_FILE_SIZE_OPTION: &str = "manifest.target-file-size";
const MANIFEST_TARGET_SIZE_OPTION: &str = "manifest.target-size";
const MANIFEST_MERGE_MIN_COUNT_OPTION: &str = "manifest.merge-min-count";
const WRITE_PARQUET_BUFFER_SIZE_OPTION: &str = "write.parquet-buffer-size";
pub(crate) const SEQUENCE_FIELD_OPTION: &str = "sequence.field";
pub(crate) const DISABLE_EXPLICIT_TYPE_CASTING_OPTION: &str = "disable-explicit-type-casting";
Expand All @@ -62,6 +67,9 @@ pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
pub const SCAN_VERSION_OPTION: &str = "scan.version";
const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
const DEFAULT_MANIFEST_COMPRESSION: &str = "zstd";
const DEFAULT_MANIFEST_TARGET_FILE_SIZE: i64 = 8 * 1024 * 1024;
const DEFAULT_MANIFEST_MERGE_MIN_COUNT: usize = 30;
const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
const DEFAULT_CHANGELOG_FILE_PREFIX: &str = "changelog-";
const DEFAULT_TARGET_FILE_SIZE: i64 = 256 * 1024 * 1024;
Expand Down Expand Up @@ -102,6 +110,13 @@ pub enum ChangelogProducer {
Lookup,
}

/// Action when a partial-column update touches globally indexed columns.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GlobalIndexColumnUpdateAction {
ThrowError,
DropPartitionIndex,
}

/// Bucket function used to map bucket keys to fixed bucket ids.
///
/// Reference: Java `CoreOptions.BucketFunctionType`.
Expand Down Expand Up @@ -263,6 +278,24 @@ impl<'a> CoreOptions<'a> {
Ok(value)
}

pub fn global_index_column_update_action(
&self,
) -> crate::Result<GlobalIndexColumnUpdateAction> {
match self
.options
.get(GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION)
.map(|v| v.to_ascii_uppercase())
.as_deref()
.unwrap_or("THROW_ERROR")
{
"THROW_ERROR" => Ok(GlobalIndexColumnUpdateAction::ThrowError),
"DROP_PARTITION_INDEX" => Ok(GlobalIndexColumnUpdateAction::DropPartitionIndex),
other => Err(crate::Error::ConfigInvalid {
message: format!("Unsupported global-index.column-update-action: {other}"),
}),
}
}

pub fn source_split_target_size(&self) -> i64 {
self.options
.get(SOURCE_SPLIT_TARGET_SIZE_OPTION)
Expand Down Expand Up @@ -400,6 +433,29 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(false)
}

/// Suggested target size for a manifest file. Default is 8 MiB.
///
/// `manifest.target-file-size` is the Java/Python option. The shorter
/// `manifest.target-size` alias is accepted for older Rust callers that
/// used the name from early parity discussions.
pub fn manifest_target_size(&self) -> i64 {
self.options
.get(MANIFEST_TARGET_FILE_SIZE_OPTION)
.or_else(|| self.options.get(MANIFEST_TARGET_SIZE_OPTION))
.and_then(|v| parse_memory_size(v))
.unwrap_or(DEFAULT_MANIFEST_TARGET_FILE_SIZE)
}

/// Minimum number of small manifest files required before minor manifest
/// compaction rewrites them into a new rolling manifest set.
pub fn manifest_merge_min_count(&self) -> usize {
self.options
.get(MANIFEST_MERGE_MIN_COUNT_OPTION)
.and_then(|v| v.parse().ok())
.filter(|v| *v > 0)
.unwrap_or(DEFAULT_MANIFEST_MERGE_MIN_COUNT)
}

/// Number of buckets for the table. Default is 1.
pub fn bucket(&self) -> i32 {
self.options
Expand Down Expand Up @@ -505,6 +561,15 @@ impl<'a> CoreOptions<'a> {
.map(String::as_str)
}

/// Avro compression codec for manifest, manifest-list and index-manifest files.
/// Default is `"zstd"`, matching Java Paimon `CoreOptions.MANIFEST_COMPRESSION`.
pub fn manifest_compression(&self) -> &str {
self.options
.get(MANIFEST_COMPRESSION_OPTION)
.map(String::as_str)
.unwrap_or(DEFAULT_MANIFEST_COMPRESSION)
}

/// Parquet writer in-progress buffer size limit. Default is 256MB.
/// When the buffered data exceeds this, the writer flushes the current row group.
pub fn write_parquet_buffer_size(&self) -> i64 {
Expand Down Expand Up @@ -588,6 +653,10 @@ mod tests {
core_options.global_index_row_count_per_shard().unwrap(),
100_000
);
assert_eq!(
core_options.global_index_column_update_action().unwrap(),
GlobalIndexColumnUpdateAction::ThrowError
);
}

#[test]
Expand All @@ -605,6 +674,10 @@ mod tests {
GLOBAL_INDEX_ROW_COUNT_PER_SHARD_OPTION.to_string(),
"2048".to_string(),
),
(
GLOBAL_INDEX_COLUMN_UPDATE_ACTION_OPTION.to_string(),
"DROP_PARTITION_INDEX".to_string(),
),
]);
let core_options = CoreOptions::new(&options);

Expand All @@ -614,6 +687,10 @@ mod tests {
core_options.global_index_row_count_per_shard().unwrap(),
2048
);
assert_eq!(
core_options.global_index_column_update_action().unwrap(),
GlobalIndexColumnUpdateAction::DropPartitionIndex
);
}

#[test]
Expand Down Expand Up @@ -815,6 +892,9 @@ mod tests {
assert_eq!(core.commit_min_retry_wait_ms(), 1_000);
assert_eq!(core.commit_max_retry_wait_ms(), 10_000);
assert!(!core.row_tracking_enabled());
assert_eq!(core.manifest_compression(), "zstd");
assert_eq!(core.manifest_target_size(), 8 * 1024 * 1024);
assert_eq!(core.manifest_merge_min_count(), 30);
}

#[test]
Expand All @@ -826,6 +906,12 @@ mod tests {
(COMMIT_MIN_RETRY_WAIT_OPTION.to_string(), "500".to_string()),
(COMMIT_MAX_RETRY_WAIT_OPTION.to_string(), "5000".to_string()),
(ROW_TRACKING_ENABLED_OPTION.to_string(), "true".to_string()),
(
MANIFEST_TARGET_FILE_SIZE_OPTION.to_string(),
"1kb".to_string(),
),
(MANIFEST_COMPRESSION_OPTION.to_string(), "null".to_string()),
(MANIFEST_MERGE_MIN_COUNT_OPTION.to_string(), "3".to_string()),
]);
let core = CoreOptions::new(&options);
assert_eq!(core.bucket(), 4);
Expand All @@ -834,6 +920,17 @@ mod tests {
assert_eq!(core.commit_min_retry_wait_ms(), 500);
assert_eq!(core.commit_max_retry_wait_ms(), 5_000);
assert!(core.row_tracking_enabled());
assert_eq!(core.manifest_compression(), "null");
assert_eq!(core.manifest_target_size(), 1024);
assert_eq!(core.manifest_merge_min_count(), 3);
}

#[test]
fn test_manifest_target_size_accepts_compat_alias() {
let options = HashMap::from([(MANIFEST_TARGET_SIZE_OPTION.to_string(), "2kb".into())]);
let core = CoreOptions::new(&options);

assert_eq!(core.manifest_target_size(), 2 * 1024);
}

#[test]
Expand Down
22 changes: 21 additions & 1 deletion crates/paimon/src/spec/index_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,27 @@ impl IndexManifest {

/// Write index manifest entries to a file.
pub async fn write(file_io: &FileIO, path: &str, entries: &[IndexManifestEntry]) -> Result<()> {
let bytes = crate::spec::to_avro_bytes(INDEX_MANIFEST_ENTRY_SCHEMA, entries)?;
Self::write_with_compression(
file_io,
path,
entries,
crate::spec::DEFAULT_AVRO_COMPRESSION,
)
.await
}

/// Write index manifest entries with the configured Avro compression.
pub async fn write_with_compression(
file_io: &FileIO,
path: &str,
entries: &[IndexManifestEntry],
compression: &str,
) -> Result<()> {
let bytes = crate::spec::to_avro_bytes_with_compression(
INDEX_MANIFEST_ENTRY_SCHEMA,
entries,
compression,
)?;
let output = file_io.new_output(path)?;
output.write(bytes::Bytes::from(bytes)).await
}
Expand Down
22 changes: 21 additions & 1 deletion crates/paimon/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,27 @@ impl Manifest {

/// Write manifest entries to a file.
pub async fn write(file_io: &FileIO, path: &str, entries: &[ManifestEntry]) -> Result<()> {
let bytes = crate::spec::to_avro_bytes(MANIFEST_ENTRY_SCHEMA, entries)?;
Self::write_with_compression(
file_io,
path,
entries,
crate::spec::DEFAULT_AVRO_COMPRESSION,
)
.await
}

/// Write manifest entries to a file with the configured Avro compression.
pub async fn write_with_compression(
file_io: &FileIO,
path: &str,
entries: &[ManifestEntry],
compression: &str,
) -> Result<()> {
let bytes = crate::spec::to_avro_bytes_with_compression(
MANIFEST_ENTRY_SCHEMA,
entries,
compression,
)?;
let output = file_io.new_output(path)?;
output.write(bytes::Bytes::from(bytes)).await
}
Expand Down
9 changes: 9 additions & 0 deletions crates/paimon/src/spec/manifest_file_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ impl ManifestFileMeta {
self
}

/// Attach row-id range statistics aggregated from manifest entries.
#[inline]
#[must_use]
pub fn with_row_id_stats(mut self, min_row_id: Option<i64>, max_row_id: Option<i64>) -> Self {
self.min_row_id = min_row_id;
self.max_row_id = max_row_id;
self
}

#[inline]
pub fn new(
file_name: String,
Expand Down
17 changes: 16 additions & 1 deletion crates/paimon/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,22 @@ impl ManifestList {

/// Write manifest file metas to a manifest list file.
pub async fn write(file_io: &FileIO, path: &str, metas: &[ManifestFileMeta]) -> Result<()> {
let bytes = crate::spec::to_avro_bytes(MANIFEST_FILE_META_SCHEMA, metas)?;
Self::write_with_compression(file_io, path, metas, crate::spec::DEFAULT_AVRO_COMPRESSION)
.await
}

/// Write manifest file metas with the configured Avro compression.
pub async fn write_with_compression(
file_io: &FileIO,
path: &str,
metas: &[ManifestFileMeta],
compression: &str,
) -> Result<()> {
let bytes = crate::spec::to_avro_bytes_with_compression(
MANIFEST_FILE_META_SCHEMA,
metas,
compression,
)?;
let output = file_io.new_output(path)?;
output.write(bytes::Bytes::from(bytes)).await
}
Expand Down
5 changes: 5 additions & 0 deletions crates/paimon/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ pub use manifest_common::FileKind;
mod manifest_entry;
pub use manifest_entry::Identifier;
pub use manifest_entry::ManifestEntry;
pub(crate) use manifest_entry::MANIFEST_ENTRY_SCHEMA;
mod manifest_list;
pub use manifest_list::ManifestList;
mod objects_file;
pub use objects_file::from_avro_bytes;
pub use objects_file::to_avro_bytes;
pub(crate) use objects_file::{
new_avro_writer, to_avro_bytes_with_compression, DEFAULT_AVRO_BLOCK_SIZE,
DEFAULT_AVRO_COMPRESSION,
};
pub(crate) mod avro;
pub(crate) mod stats;
mod types;
Expand Down
41 changes: 38 additions & 3 deletions crates/paimon/src/spec/objects_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use apache_avro::{from_value, to_value, Codec, Reader, Schema, Writer};
use apache_avro::{from_value, to_value, Codec, Reader, Schema, Writer, ZstandardSettings};
use serde::de::DeserializeOwned;
use serde::Serialize;

pub(crate) const DEFAULT_AVRO_BLOCK_SIZE: usize = 16_000;
pub(crate) const DEFAULT_AVRO_COMPRESSION: &str = "zstd";

pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> crate::Result<Vec<T>> {
Reader::new(bytes)?
.map(|r| {
Expand All @@ -33,15 +36,47 @@ pub fn from_avro_bytes<T: DeserializeOwned>(bytes: &[u8]) -> crate::Result<Vec<T
/// The `schema_json` must be a valid Avro schema JSON string that matches
/// the serde serialization layout of `T`.
pub fn to_avro_bytes<T: Serialize>(schema_json: &str, records: &[T]) -> crate::Result<Vec<u8>> {
to_avro_bytes_with_compression(schema_json, records, DEFAULT_AVRO_COMPRESSION)
}

pub(crate) fn to_avro_bytes_with_compression<T: Serialize>(
schema_json: &str,
records: &[T],
compression: &str,
) -> crate::Result<Vec<u8>> {
let schema = Schema::parse_str(schema_json)?;
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
let mut writer = new_avro_writer(&schema, compression, DEFAULT_AVRO_BLOCK_SIZE)?;
for record in records {
let value = to_value(record).and_then(|v| v.resolve(&schema))?;
let value = to_value(record).and_then(|value| value.resolve(&schema))?;
writer.append(value)?;
}
Ok(writer.into_inner()?)
}

pub(crate) fn new_avro_writer<'a>(
schema: &'a Schema,
compression: &str,
block_size: usize,
) -> crate::Result<Writer<'a, Vec<u8>>> {
Ok(Writer::builder()
.schema(schema)
.writer(Vec::new())
.codec(avro_codec(compression)?)
.block_size(block_size.max(1))
.build())
}

pub(crate) fn avro_codec(compression: &str) -> crate::Result<Codec> {
match compression.to_ascii_lowercase().as_str() {
"zstd" | "zstandard" => Ok(Codec::Zstandard(ZstandardSettings::default())),
"null" | "none" | "uncompressed" => Ok(Codec::Null),
"snappy" => Ok(Codec::Snappy),
other => Err(crate::Error::Unsupported {
message: format!("Unsupported Avro compression: {other}"),
}),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
10 changes: 10 additions & 0 deletions crates/paimon/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub struct Snapshot {
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
changelog_manifest_list: Option<String>,
/// byte size of changelog manifest list file
#[builder(default = None)]
#[serde(default, skip_serializing_if = "Option::is_none")]
changelog_manifest_list_size: Option<i64>,
/// a manifest recording all index files of this table
#[builder(default = None)]
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -150,6 +154,12 @@ impl Snapshot {
self.changelog_manifest_list.as_deref()
}

/// Get the byte size of changelog manifest list of this snapshot.
#[inline]
pub fn changelog_manifest_list_size(&self) -> Option<i64> {
self.changelog_manifest_list_size
}

/// Get the index manifest of this snapshot.
#[inline]
pub fn index_manifest(&self) -> Option<&str> {
Expand Down
Loading
Loading