Add V2 batch format with statistics collection#2886
Conversation
af26717 to
836948c
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces a new V2 Arrow log record batch format that can embed per-column min/max + null-count statistics between the batch header and record payload, enabling future filter pushdown improvements (Issue #2885 / FIP-10).
Changes:
- Add statistics collection/serialization/parsing APIs and implementations (collector/writer/parser + batch-access API).
- Extend Arrow log batch building, reading, and projection logic to account for a V2 layout with an optional statistics section and a
StatisticsLengthheader field. - Add table-level configuration (
table.statistics.columns) utilities and validation, and wire statistics collection into the client write path.
Reviewed changes
Copilot reviewed 32 out of 32 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/main/java/org/apache/fluss/server/kv/wal/ArrowWalBuilder.java | Adapts builder call sites to new statistics-capable Arrow batch builder signature. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java | Validates table statistics configuration on CREATE TABLE. |
| fluss-common/src/test/java/org/apache/fluss/record/TestData.java | Adds schemas/data for statistics-related tests. |
| fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java | Extends batch builder tests to cover V2 + statistics. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTestUtils.java | Adds reusable utilities for generating batches with statistics in tests. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsTest.java | Adds end-to-end tests for statistics extraction/caching across batch impls. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsParserTest.java | Adds tests for statistics parsing/validation helpers. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchStatisticsCollectorTest.java | Adds tests for collector behavior across types/nulls/mappings. |
| fluss-common/src/test/java/org/apache/fluss/record/LogRecordBatchFormatTest.java | Adds V2 header/offset assertions and stats-aware offsets. |
| fluss-common/src/test/java/org/apache/fluss/config/StatisticsConfigUtilsTest.java | Adds tests for statistics config validation. |
| fluss-common/src/main/java/org/apache/fluss/types/DataTypeChecks.java | Adds isBinaryType helper used by stats config/mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/row/arrow/ArrowWriter.java | Exposes RowType schema via getSchema() for stats wiring. |
| fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java | Adds AlignedRow.from(RowType, InternalRow) conversion helper. |
| fluss-common/src/main/java/org/apache/fluss/record/bytesview/MultiBytesView.java | Enhances builder API (addBytes(BytesView), isEmpty) and improves file-region merging. |
| fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java | Implements V2 batch building with optional embedded statistics + CRC handling changes. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsWriter.java | New: serializes statistics in a compact schema-aware format. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsParser.java | New: parses/validates statistics payloads from multiple memory sources. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatisticsCollector.java | New: collects per-column min/max/null-counts during batch build. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchStatistics.java | New: statistics interface exposed from LogRecordBatch. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java | Adds V2 constants/layout helpers, statistics offsets, and header mutation helper. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java | Adds getStatistics(ReadContext) API to batches. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java | Updates projection to skip stats section for V2 and clear stats header fields. |
| fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java | Adds stats parsing and provides optional “trim stats” BytesView generation for V2. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java | New: zero-copy statistics view with full-schema wrappers + mapping logic. |
| fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java | Updates record decoding offsets for V2 (stats-aware) and implements statistics parsing/caching. |
| fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java | Computes and caches stats column index mappings based on table config. |
| fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java | Adds statistics config accessors and enablement checks. |
| fluss-common/src/main/java/org/apache/fluss/config/StatisticsConfigUtils.java | New: validates table.statistics.columns against schema + supported types. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds table.statistics.columns option definition/documentation. |
| fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java | Updates tests for ArrowLogWriteBatch constructor signature. |
| fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java | Wires per-table statistics collection into batch creation when enabled. |
| fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java | Passes statistics collector into Arrow batch builder. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
836948c to
51dd84c
Compare
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering. - Add LogRecordBatchStatistics and related classes for statistics collection - Add StatisticsConfigUtils for parsing table.statistics.columns configuration - Extend DefaultLogRecordBatch to support V2 format with statistics - Place statistics data between header and records with StatisticsLength field - Add comprehensive tests for statistics collection and parsing
51dd84c to
e595708
Compare
wuchong
left a comment
There was a problem hiding this comment.
Thanks @platinumhamburg , I left some comments.
…stics Config & API: - Change TABLE_STATISTICS_COLUMNS default from "*" to noDefaultValue() - Introduce StatisticsColumnsConfig with three-state mode (DISABLED/ALL/SPECIFIED) - Replace isBinaryType blacklist with isSupportedStatisticsType whitelist - Update config documentation with compatibility requirements Format versioning: - Renumber versions: V0=base, V1=statistics, V2=leaderEpoch(future) - Remove redundant STATISTICS_FLAG, use statisticsLength > 0 instead - Make statisticsLengthOffset/leaderEpochOffset throw for unsupported versions - Remove unsafe arrowChangeTypeOffset(magic) method Write path: - Serialize statistics directly to pagedOutputView (no temp byte[]) - Restore zero-copy CRC over contiguous memory segments - Fix estimatedSizeInBytes to use recordBatchHeaderSize(magic) Writer optimization: - Replace expensive estimatedSizeInBytes with cached heuristic estimate - Reuse AlignedRowWriter instance across writes - Extract backing byte[] directly from MemorySegment for OutputView writes Parser optimization: - Replace sequential MemorySegmentInputView reads with fixed-offset access - Add ByteBuffer.isDirect() check before wrapOffHeapMemory - Add skipBytes() to MemorySegmentInputView Correctness: - Remove redundant minSet/maxSet arrays in statistics collector - Throw UnsupportedOperationException for unsupported types in AlignedRow - Fix minValuesSize condition from <= 0 to == 0 Code cleanup: - Remove unused methods (getBytesView, createTrimmedBytesView, addBytes, etc.) - Add @nullable annotations on statisticsCollector and parseStatistics return - Add logging in statistics parsing catch blocks - Wrap clearStatisticsFromHeader with statisticsLength > 0 guard Tests & docs: - Add zero-rows and all-null column test cases - Update ConfigOptions docs with V1 format and compatibility notes
…stics Config & API: - Change TABLE_STATISTICS_COLUMNS default from "*" to noDefaultValue() - Introduce StatisticsColumnsConfig with three-state mode (DISABLED/ALL/SPECIFIED) - Replace isBinaryType blacklist with isSupportedStatisticsType whitelist - Update config documentation with compatibility requirements Format versioning: - Renumber versions: V0=base, V1=statistics, V2=leaderEpoch(future) - Remove redundant STATISTICS_FLAG, use statisticsLength > 0 instead - Make statisticsLengthOffset/leaderEpochOffset throw for unsupported versions - Remove unsafe arrowChangeTypeOffset(magic) method Write path: - Serialize statistics directly to pagedOutputView (no temp byte[]) - Restore zero-copy CRC over contiguous memory segments - Fix estimatedSizeInBytes to use recordBatchHeaderSize(magic) Writer optimization: - Replace expensive estimatedSizeInBytes with cached heuristic estimate - Reuse AlignedRowWriter instance across writes - Extract backing byte[] directly from MemorySegment for OutputView writes Parser optimization: - Replace sequential MemorySegmentInputView reads with fixed-offset access - Add ByteBuffer.isDirect() check before wrapOffHeapMemory - Add skipBytes() to MemorySegmentInputView Correctness: - Remove redundant minSet/maxSet arrays in statistics collector - Throw UnsupportedOperationException for unsupported types in AlignedRow - Fix minValuesSize condition from <= 0 to == 0 Code cleanup: - Remove unused methods (getBytesView, createTrimmedBytesView, addBytes, etc.) - Add @nullable annotations on statisticsCollector and parseStatistics return - Add logging in statistics parsing catch blocks - Wrap clearStatisticsFromHeader with statisticsLength > 0 guard Tests & docs: - Add zero-rows and all-null column test cases - Update ConfigOptions docs with V1 format and compatibility notes
8903de9 to
bd8123b
Compare
|
Thanks to @wuchong for the detailed and professional review comments. This PR indeed has many areas for improvement, and I have optimized and fixed it according to the feedback. Please continue reviewing when you have time. |
|
wuchong
left a comment
There was a problem hiding this comment.
@platinumhamburg I pushed a commit to address the comment , improve some code, and resolve the code coverage problem. Please take a look .
| @Override | ||
| public InternalRow getMinValues() { | ||
| if (minValuesSize == 0) { | ||
| return null; |
There was a problem hiding this comment.
Since we always serialize the max and min InternalRow for statistics, a minValuesSize of 0 is invalid and should be validated within the constructor. This approach simplifies the usage of getMinValues by eliminating the need to handle the nullability of the returned value.
Thanks a lot! This is a really solid improvement. |
Introduce V2 batch format that collects min/max statistics for each column to enable efficient filtering.
Purpose
Linked issue: close #2885
Brief change log
Tests
API and Format
Documentation