Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use parquet::file::serialized_reader::ReadOptionsBuilder;
const NUM_COLUMNS: usize = 10_000;
const NUM_ROW_GROUPS: usize = 10;

fn encoded_meta(is_nullable: bool, has_lists: bool) -> Vec<u8> {
fn encoded_meta(is_nullable: bool, has_lists: bool, write_path_in_schema: bool) -> Vec<u8> {
let mut rng = seedable_rng();

let mut column_desc_ptrs: Vec<ColumnDescPtr> = Vec::with_capacity(NUM_COLUMNS);
Expand Down Expand Up @@ -143,7 +143,11 @@ fn encoded_meta(is_nullable: bool, has_lists: bool) -> Vec<u8> {
let mut buffer = Vec::with_capacity(1024);
{
let buf = TrackedWrite::new(&mut buffer);
let writer = ParquetMetaDataWriter::new_with_tracked(buf, &metadata);
let mut writer = ParquetMetaDataWriter::new_with_tracked(buf, &metadata);
// use defaults unless `write_path_in_schema` is false
if !write_path_in_schema {
writer = writer.with_write_path_in_schema(write_path_in_schema);
}
writer.finish().unwrap();
}

Expand Down Expand Up @@ -233,7 +237,7 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let buf: Bytes = black_box(encoded_meta(false, false)).into();
let buf: Bytes = black_box(encoded_meta(false, false, true)).into();
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
Expand Down Expand Up @@ -275,7 +279,15 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let buf: Bytes = black_box(encoded_meta(true, true)).into();
let buf: Bytes = black_box(encoded_meta(false, false, false)).into();
let options = ParquetMetaDataOptions::new().with_encoding_stats_as_mask(false);
c.bench_function("decode parquet metadata no path_in_schema (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata_with_options(&buf, Some(&options)).unwrap();
})
});

let buf: Bytes = black_box(encoded_meta(true, true, true)).into();
c.bench_function("decode parquet metadata w/ size stats (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
Expand Down
38 changes: 38 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4580,6 +4580,44 @@ mod tests {
}
}

#[test]
fn test_arrow_writer_skip_path_in_schema() {
let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
let file_schema = Arc::new(batch_schema.clone());

let batch = RecordBatch::try_new(
Arc::new(batch_schema),
vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
)
.unwrap();

// default options should still write path_in_schema
let skip_options = ArrowWriterOptions::new();

let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new_with_options(&mut buf, file_schema.clone(), skip_options).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

// override to not write path_in_schema
let skip_options = ArrowWriterOptions::new().with_properties(
WriterProperties::builder()
.set_write_path_in_schema(false)
.build(),
);

let mut buf2 = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new_with_options(&mut buf2, file_schema.clone(), skip_options)
.unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

// buf2 should be a bit smaller due to lack of path_in_schema
assert!(buf.len() > buf2.len());
}

#[test]
fn mismatched_schemas() {
let batch_schema = Schema::new(vec![Field::new("count", DataType::Int32, false)]);
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/bin/parquet-rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ struct Args {
#[clap(long)]
write_page_header_statistics: Option<bool>,

/// Write path_in_schema to the column metadata.
#[clap(long)]
write_path_in_schema: Option<bool>,

/// Sets whether bloom filter is enabled for all columns.
#[clap(long)]
bloom_filter_enabled: Option<bool>,
Expand Down Expand Up @@ -406,6 +410,9 @@ fn main() {
if let Some(value) = args.coerce_types {
writer_properties_builder = writer_properties_builder.set_coerce_types(value);
}
if let Some(value) = args.write_path_in_schema {
writer_properties_builder = writer_properties_builder.set_write_path_in_schema(value);
}
if let Some(value) = args.write_batch_size {
writer_properties_builder = writer_properties_builder.set_write_batch_size(value);
}
Expand Down
17 changes: 13 additions & 4 deletions parquet/src/file/metadata/thrift/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,15 @@ pub(super) fn serialize_column_meta_data<W: Write>(
.encodings()
.collect::<Vec<_>>()
.write_thrift_field(w, 2, 1)?;
let path = column_chunk.column_descr.path().parts();
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
path.write_thrift_field(w, 3, 2)?;
column_chunk.compression.write_thrift_field(w, 4, 3)?;
if w.write_path_in_schema() {
let path = column_chunk.column_descr.path().parts();
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
path.write_thrift_field(w, 3, 2)?;
column_chunk.compression.write_thrift_field(w, 4, 3)?;
} else {
column_chunk.compression.write_thrift_field(w, 4, 2)?;
}

column_chunk.num_values.write_thrift_field(w, 5, 4)?;
column_chunk
.total_uncompressed_size
Expand Down Expand Up @@ -1406,6 +1411,8 @@ pub(super) fn serialize_column_meta_data<W: Write>(
pub(super) struct FileMeta<'a> {
pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
pub(super) row_groups: &'a Vec<RowGroupMetaData>,
// If true, then write the `path_in_schema` field in the ColumnMetaData struct.
pub(super) write_path_in_schema: bool,
}

// struct FileMetaData {
Expand All @@ -1425,6 +1432,8 @@ impl<'a> WriteThrift for FileMeta<'a> {
// needed for last_field_id w/o encryption
#[allow(unused_assignments)]
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
writer.set_write_path_in_schema(self.write_path_in_schema);

self.file_metadata
.version
.write_thrift_field(writer, 1, 0)?;
Expand Down
21 changes: 20 additions & 1 deletion parquet/src/file/metadata/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
created_by: Option<String>,
object_writer: MetadataObjectWriter,
writer_version: i32,
write_path_in_schema: bool,
}

impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
Expand Down Expand Up @@ -259,6 +260,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
let file_meta = FileMeta {
file_metadata: &file_metadata,
row_groups: &row_groups,
write_path_in_schema: self.write_path_in_schema,
};

// Write file metadata
Expand Down Expand Up @@ -293,6 +295,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
row_groups: Vec<RowGroupMetaData>,
created_by: Option<String>,
writer_version: i32,
write_path_in_schema: bool,
) -> Self {
Self {
buf,
Expand All @@ -304,6 +307,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
created_by,
object_writer: Default::default(),
writer_version,
write_path_in_schema,
}
}

Expand Down Expand Up @@ -415,6 +419,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
pub struct ParquetMetaDataWriter<'a, W: Write> {
buf: TrackedWrite<W>,
metadata: &'a ParquetMetaData,
write_path_in_schema: bool,
}

impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
Expand All @@ -436,7 +441,20 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
///
/// See example on the struct level documentation
pub fn new_with_tracked(buf: TrackedWrite<W>, metadata: &'a ParquetMetaData) -> Self {
Self { buf, metadata }
Self {
buf,
metadata,
write_path_in_schema: true,
}
}

/// Set whether or not to write the `path_in_schema` field in the Thrift `ColumnMetaData`
/// struct.
pub fn with_write_path_in_schema(self, val: bool) -> Self {
Self {
write_path_in_schema: val,
..self
}
}

/// Write the metadata to the buffer
Expand All @@ -460,6 +478,7 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
row_groups,
created_by,
file_metadata.version(),
self.write_path_in_schema,
);

if let Some(column_indexes) = column_indexes {
Expand Down
52 changes: 52 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub const DEFAULT_OFFSET_INDEX_DISABLED: bool = false;
pub const DEFAULT_COERCE_TYPES: bool = false;
/// Default value for [`WriterProperties::data_page_v2_compression_ratio_threshold`]
pub const DEFAULT_DATA_PAGE_V2_COMPRESSION_RATIO_THRESHOLD: f64 = 1.0;
/// Default value for [`WriterProperties::write_path_in_schema`]
pub const DEFAULT_WRITE_PATH_IN_SCHEMA: bool = true;
/// Default minimum chunk size for content-defined chunking: 256 KiB.
pub const DEFAULT_CDC_MIN_CHUNK_SIZE: usize = 256 * 1024;
/// Default maximum chunk size for content-defined chunking: 1024 KiB.
Expand Down Expand Up @@ -252,6 +254,7 @@ pub struct WriterProperties {
statistics_truncate_length: Option<usize>,
coerce_types: bool,
content_defined_chunking: Option<CdcOptions>,
write_path_in_schema: bool,
#[cfg(feature = "encryption")]
pub(crate) file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}
Expand Down Expand Up @@ -437,6 +440,14 @@ impl WriterProperties {
self.coerce_types
}

/// Returns `true` if the `path_in_schema` field of the `ColumnMetaData` Thrift struct
/// should be written.
///
/// For more details see [`WriterPropertiesBuilder::set_write_path_in_schema`]
pub fn write_path_in_schema(&self) -> bool {
self.write_path_in_schema
}

/// EXPERIMENTAL: Returns content-defined chunking options, or `None` if CDC is disabled.
///
/// For more details see [`WriterPropertiesBuilder::set_content_defined_chunking`]
Expand Down Expand Up @@ -592,6 +603,7 @@ pub struct WriterPropertiesBuilder {
statistics_truncate_length: Option<usize>,
coerce_types: bool,
content_defined_chunking: Option<CdcOptions>,
write_path_in_schema: bool,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<Arc<FileEncryptionProperties>>,
}
Expand All @@ -616,6 +628,7 @@ impl Default for WriterPropertiesBuilder {
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
coerce_types: DEFAULT_COERCE_TYPES,
content_defined_chunking: None,
write_path_in_schema: DEFAULT_WRITE_PATH_IN_SCHEMA,
#[cfg(feature = "encryption")]
file_encryption_properties: None,
}
Expand Down Expand Up @@ -670,6 +683,7 @@ impl WriterPropertiesBuilder {
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
content_defined_chunking: self.content_defined_chunking,
write_path_in_schema: self.write_path_in_schema,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
}
Expand Down Expand Up @@ -885,6 +899,43 @@ impl WriterPropertiesBuilder {
self
}

/// EXPERIMENTAL: Should the writer emit the `path_in_schema` element of the
/// `ColumnMetaData` Thrift struct. Defaults to `true` via [`DEFAULT_WRITE_PATH_IN_SCHEMA`].
///
/// Because `path_in_schema` is a field on the `ColumnMetaData`, it is repeated
/// `num_columns * num_rowgroups` times. Compounding this is any level of nesting or
/// repetition in the schema. For instance, a top-level list column named `foo` will have
/// a `path_in_schema` of `["foo", "list", "element"]`. A list-of-struct is even worse,
/// because the necessary list wrapping is repeated for each element of the struct. A
/// file with a deeply nested schema and many row groups can have a large percentage of the
/// footer taken up by this field. For example, a file of 38 row groups with a schema containing
/// several lists of structs containing lists had 36% of the footer taken up by `path_in_schema`.
/// Removing this redundant information can greatly speed up footer parsing, which is particularly
/// important in scenarios where one does not wish to read the entire file (e.g. point
/// lookups).
///
/// <div class="warning">
///
/// **WARNING:**
/// Setting this to `false` will break compatibility with Parquet readers that
/// still expect this field to be present. Virtually all Parquet readers (parquet-java,
/// Spark, arrow-cpp, pyarrow, pandas to name a few), with the exception
/// of the one in this crate, expect this field to be present, and will terminate execution
/// if it is not. This will continue to be the case unless/until the Parquet format
/// specification is explicitly changed to allow this field to be missing. As a consquence,
/// users should only set this to `false` if they have verified that any reader(s) they plan
/// to use can tolerate the absence of this field.
///
/// For more context, see [GH-563].
///
/// </div>
///
/// [GH-563]: https://github.com/apache/parquet-format/issues/563
pub fn set_write_path_in_schema(mut self, write_path_in_schema: bool) -> Self {
self.write_path_in_schema = write_path_in_schema;
self
}

/// EXPERIMENTAL: Sets content-defined chunking options, or disables CDC with `None`.
///
/// When enabled, data page boundaries are determined by a rolling hash of the
Expand Down Expand Up @@ -1253,6 +1304,7 @@ impl From<WriterProperties> for WriterPropertiesBuilder {
statistics_truncate_length: props.statistics_truncate_length,
coerce_types: props.coerce_types,
content_defined_chunking: props.content_defined_chunking,
write_path_in_schema: props.write_path_in_schema,
#[cfg(feature = "encryption")]
file_encryption_properties: props.file_encryption_properties,
}
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,14 @@ impl<W: Write + Send> SerializedFileWriter<W> {
let column_indexes = std::mem::take(&mut self.column_indexes);
let offset_indexes = std::mem::take(&mut self.offset_indexes);

let write_path_in_schema = self.props.write_path_in_schema();
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
&self.descr,
row_groups,
Some(self.props.created_by().to_string()),
self.props.writer_version().as_num(),
write_path_in_schema,
);

#[cfg(feature = "encryption")]
Expand Down
19 changes: 18 additions & 1 deletion parquet/src/parquet_thrift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,29 @@ where
/// [compact output]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
pub(crate) struct ThriftCompactOutputProtocol<W: Write> {
writer: W,
write_path_in_schema: bool,
}

impl<W: Write> ThriftCompactOutputProtocol<W> {
/// Create a new `ThriftCompactOutputProtocol` wrapping the byte sink `writer`.
pub(crate) fn new(writer: W) -> Self {
Self { writer }
Self {
writer,
write_path_in_schema: true,
}
}

// TODO(ets): at some point there should probably be a properties object
// to control aspects of thrift output. But since this is the only option to date
// I'm choosing a simpler API.
/// Control the writing of the `path_in_schema` element of the `ColumnMetaData`
pub(crate) fn set_write_path_in_schema(&mut self, val: bool) {
self.write_path_in_schema = val;
}

/// Indicate whether or not to emit `path_in_schema`.
pub(crate) fn write_path_in_schema(&self) -> bool {
self.write_path_in_schema
}

/// Write a single byte to the output stream.
Expand Down
Loading