From 9db76b3b0ae428b352c04a92cdd2a24b88e8a5e7 Mon Sep 17 00:00:00 2001 From: "xuweixin.rex" Date: Mon, 29 Jun 2026 14:39:02 +0800 Subject: [PATCH] Support chain data split --- src/paimon/CMakeLists.txt | 2 + .../core/io/chain_data_file_path_factory.cpp | 45 ++++ .../core/io/chain_data_file_path_factory.h | 39 +++ src/paimon/core/io/data_file_path_factory.h | 2 +- .../core/operation/abstract_split_read.cpp | 17 ++ .../core/operation/abstract_split_read.h | 3 + .../operation/data_evolution_split_read.cpp | 9 +- .../core/operation/merge_file_split_read.cpp | 5 +- .../core/operation/raw_file_split_read.cpp | 17 +- .../core/operation/raw_file_split_read.h | 4 +- .../core/table/source/chain_data_split_impl.h | 70 ++++++ .../table/source/chain_data_split_test.cpp | 223 ++++++++++++++++++ .../core/table/source/data_split_impl.h | 14 +- src/paimon/core/table/source/split.cpp | 122 +++++++++- 14 files changed, 550 insertions(+), 22 deletions(-) create mode 100644 src/paimon/core/io/chain_data_file_path_factory.cpp create mode 100644 src/paimon/core/io/chain_data_file_path_factory.h create mode 100644 src/paimon/core/table/source/chain_data_split_impl.h create mode 100644 src/paimon/core/table/source/chain_data_split_test.cpp diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 3f99662aa..212eceb80 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -219,6 +219,7 @@ set(PAIMON_CORE_SRCS core/io/data_file_meta_first_row_id_legacy_serializer.cpp core/io/data_file_meta.cpp core/io/data_file_meta_serializer.cpp + core/io/chain_data_file_path_factory.cpp core/io/data_file_path_factory.cpp core/io/append_data_file_writer_factory.cpp core/io/blob_data_file_writer_factory.cpp @@ -733,6 +734,7 @@ if(PAIMON_BUILD_TESTS) core/table/source/table_read_test.cpp core/table/source/append_count_reader_test.cpp core/table/source/pk_count_reader_test.cpp + core/table/source/chain_data_split_test.cpp core/table/source/data_split_test.cpp core/table/source/deletion_file_test.cpp core/table/source/split_generator_test.cpp diff --git a/src/paimon/core/io/chain_data_file_path_factory.cpp b/src/paimon/core/io/chain_data_file_path_factory.cpp new file mode 100644 index 000000000..c5419899f --- /dev/null +++ b/src/paimon/core/io/chain_data_file_path_factory.cpp @@ -0,0 +1,45 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/io/chain_data_file_path_factory.h" + +#include + +#include "paimon/common/utils/path_util.h" +#include "paimon/core/io/data_file_meta.h" + +namespace paimon { + +ChainDataFilePathFactory::ChainDataFilePathFactory( + std::shared_ptr fallback, + std::unordered_map file_bucket_path_mapping) + : fallback_(std::move(fallback)), + file_bucket_path_mapping_(std::move(file_bucket_path_mapping)) {} + +std::string ChainDataFilePathFactory::ToPath(const std::shared_ptr& file_meta) const { + if (file_meta->external_path) { + return file_meta->external_path.value(); + } + + auto it = file_bucket_path_mapping_.find(file_meta->file_name); + if (it != file_bucket_path_mapping_.end()) { + return PathUtil::JoinPath(it->second, file_meta->file_name); + } + + return fallback_->ToPath(file_meta); +} + +} // namespace paimon diff --git a/src/paimon/core/io/chain_data_file_path_factory.h b/src/paimon/core/io/chain_data_file_path_factory.h new file mode 100644 index 000000000..30270a30f --- /dev/null +++ b/src/paimon/core/io/chain_data_file_path_factory.h @@ -0,0 +1,39 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/io/data_file_path_factory.h" + +namespace paimon { + +class ChainDataFilePathFactory : public DataFilePathFactory { + public: + ChainDataFilePathFactory(std::shared_ptr fallback, + std::unordered_map file_bucket_path_mapping); + + std::string ToPath(const std::shared_ptr& file_meta) const override; + + private: + std::shared_ptr fallback_; + std::unordered_map file_bucket_path_mapping_; +}; + +} // namespace paimon diff --git a/src/paimon/core/io/data_file_path_factory.h b/src/paimon/core/io/data_file_path_factory.h index 34a315a98..fddb06edc 100644 --- a/src/paimon/core/io/data_file_path_factory.h +++ b/src/paimon/core/io/data_file_path_factory.h @@ -76,7 +76,7 @@ class DataFilePathFactory : public PathFactory { } std::string ToPath(const std::string& file_name) const override; - std::string ToPath(const std::shared_ptr& file_meta) const; + virtual std::string ToPath(const std::shared_ptr& file_meta) const; const std::string& GetUUID() const { return uuid_; diff --git a/src/paimon/core/operation/abstract_split_read.cpp b/src/paimon/core/operation/abstract_split_read.cpp index 1e70c0554..46d5cc86d 100644 --- a/src/paimon/core/operation/abstract_split_read.cpp +++ b/src/paimon/core/operation/abstract_split_read.cpp @@ -33,6 +33,7 @@ #include "paimon/common/table/special_fields.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/object_utils.h" +#include "paimon/core/io/chain_data_file_path_factory.h" #include "paimon/core/io/complete_row_tracking_fields_reader.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/io/data_file_path_factory.h" @@ -40,6 +41,7 @@ #include "paimon/core/operation/internal_read_context.h" #include "paimon/core/partition/partition_info.h" #include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/source/chain_data_split_impl.h" #include "paimon/core/table/source/data_split_impl.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/nested_projection_utils.h" @@ -116,6 +118,21 @@ Result> AbstractSplitRead::ApplyPredicateFilterIfNe return PredicateBatchReader::Create(std::move(reader), predicate, pool_); } +Result> AbstractSplitRead::CreateDataFilePathFactory( + const std::shared_ptr& data_split) const { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr base_factory, + path_factory_->CreateDataFilePathFactory(data_split->Partition(), data_split->Bucket())); + + auto chain_split = std::dynamic_pointer_cast(data_split); + if (!chain_split) { + return base_factory; + } + + return std::make_shared(base_factory, + chain_split->FileBucketPathMapping()); +} + Result> AbstractSplitRead::PrepareReaderBuilder( const std::string& format_identifier) const { PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_format, diff --git a/src/paimon/core/operation/abstract_split_read.h b/src/paimon/core/operation/abstract_split_read.h index 4d49d78bc..22a481041 100644 --- a/src/paimon/core/operation/abstract_split_read.h +++ b/src/paimon/core/operation/abstract_split_read.h @@ -76,6 +76,9 @@ class AbstractSplitRead : public SplitRead { Result> ApplyPredicateFilterIfNeeded( std::unique_ptr&& reader, const std::shared_ptr& predicate) const; + Result> CreateDataFilePathFactory( + const std::shared_ptr& data_split) const; + protected: // return nullptr if file is skipped by index or dv virtual Result> ApplyIndexAndDvReaderIfNeeded( diff --git a/src/paimon/core/operation/data_evolution_split_read.cpp b/src/paimon/core/operation/data_evolution_split_read.cpp index fab21ac01..47f81b2d7 100644 --- a/src/paimon/core/operation/data_evolution_split_read.cpp +++ b/src/paimon/core/operation/data_evolution_split_read.cpp @@ -289,14 +289,13 @@ Result> DataEvolutionSplitRead::ExtractBlobVi Result> DataEvolutionSplitRead::InnerCreateReader( const std::shared_ptr& data_split, const std::optional>& row_ranges) const { - auto split_impl = dynamic_cast(data_split.get()); - if (split_impl == nullptr) { + auto split_impl = std::dynamic_pointer_cast(data_split); + if (!split_impl) { return Status::Invalid("unexpected error, split cast to impl failed"); } assert(raw_read_schema_->num_fields() > 0); - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr data_file_path_factory, - path_factory_->CreateDataFilePathFactory(split_impl->Partition(), split_impl->Bucket())); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + CreateDataFilePathFactory(split_impl)); auto metas = split_impl->DataFiles(); PAIMON_ASSIGN_OR_RAISE(std::vector>> split_by_row_id, MergeRangesAndSort(std::move(metas))); diff --git a/src/paimon/core/operation/merge_file_split_read.cpp b/src/paimon/core/operation/merge_file_split_read.cpp index e794a403d..8c295fb36 100644 --- a/src/paimon/core/operation/merge_file_split_read.cpp +++ b/src/paimon/core/operation/merge_file_split_read.cpp @@ -141,9 +141,8 @@ Result> MergeFileSplitRead::CreateReader( if (!data_split->BeforeFiles().empty()) { return Status::Invalid("this read cannot accept split with before files."); } - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr data_file_path_factory, - path_factory_->CreateDataFilePathFactory(data_split->Partition(), data_split->Bucket())); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + CreateDataFilePathFactory(data_split)); std::unique_ptr batch_reader; if (data_split->IsStreaming() || data_split->Bucket() == BucketModeDefine::POSTPONE_BUCKET) { PAIMON_ASSIGN_OR_RAISE( diff --git a/src/paimon/core/operation/raw_file_split_read.cpp b/src/paimon/core/operation/raw_file_split_read.cpp index 79755487e..42af0f78d 100644 --- a/src/paimon/core/operation/raw_file_split_read.cpp +++ b/src/paimon/core/operation/raw_file_split_read.cpp @@ -66,17 +66,26 @@ Result> RawFileSplitRead::CreateReader( if (!data_split) { return Status::Invalid("cannot cast split to data_split in RawFileSplitRead"); } + auto dv_factory = DeletionVector::CreateFactory( + options_.GetFileSystem(), + DeletionVector::CreateDeletionFileMap(data_split->DataFiles(), data_split->DeletionFiles()), + pool_); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + CreateDataFilePathFactory(data_split)); return CreateReader(data_split->Partition(), data_split->Bucket(), data_split->DataFiles(), - data_split->DeletionFiles()); + dv_factory, data_file_path_factory); } Result> RawFileSplitRead::CreateReader( const BinaryRow& partition, int32_t bucket, const std::vector>& data_files, - DeletionVector::Factory dv_factory) { + DeletionVector::Factory dv_factory, + std::shared_ptr data_file_path_factory) { const auto& predicate = context_->GetPredicate(); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, - path_factory_->CreateDataFilePathFactory(partition, bucket)); + if (!data_file_path_factory) { + PAIMON_ASSIGN_OR_RAISE(data_file_path_factory, + path_factory_->CreateDataFilePathFactory(partition, bucket)); + } PAIMON_ASSIGN_OR_RAISE( std::vector> raw_file_readers, diff --git a/src/paimon/core/operation/raw_file_split_read.h b/src/paimon/core/operation/raw_file_split_read.h index ebcbc2ef0..eb6924ad2 100644 --- a/src/paimon/core/operation/raw_file_split_read.h +++ b/src/paimon/core/operation/raw_file_split_read.h @@ -70,8 +70,8 @@ class RawFileSplitRead : public AbstractSplitRead { Result> CreateReader( const BinaryRow& partition, int32_t bucket, - const std::vector>& files, - DeletionVector::Factory dv_factory); + const std::vector>& files, DeletionVector::Factory dv_factory, + std::shared_ptr data_file_path_factory = nullptr); Result Match(const std::shared_ptr& split, bool force_keep_delete) const override; diff --git a/src/paimon/core/table/source/chain_data_split_impl.h b/src/paimon/core/table/source/chain_data_split_impl.h new file mode 100644 index 000000000..7588a3bfc --- /dev/null +++ b/src/paimon/core/table/source/chain_data_split_impl.h @@ -0,0 +1,70 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/core/table/source/data_split_impl.h" + +namespace paimon { + +class ChainDataSplitImpl : public DataSplitImpl { + public: + static constexpr const char* VIRTUAL_BUCKET_PATH = "placeholder::virtual-bucket-path"; + + ChainDataSplitImpl(const std::shared_ptr& base_split, bool all_snapshot_split, + const BinaryRow& read_partition, + std::unordered_map&& file_bucket_path_mapping, + std::unordered_map&& file_branch_mapping) + : DataSplitImpl(base_split->Partition(), base_split->Bucket(), base_split->BucketPath(), + std::vector>(base_split->DataFiles())), + all_snapshot_split_(all_snapshot_split), + read_partition_(read_partition), + file_bucket_path_mapping_(std::move(file_bucket_path_mapping)), + file_branch_mapping_(std::move(file_branch_mapping)) { + CopyMetadataFrom(*base_split); + } + + bool AllSnapshotSplit() const { + return all_snapshot_split_; + } + + const BinaryRow& ReadPartition() const { + return read_partition_; + } + + const std::unordered_map& FileBucketPathMapping() const { + return file_bucket_path_mapping_; + } + + const std::unordered_map& FileBranchMapping() const { + return file_branch_mapping_; + } + + private: + bool all_snapshot_split_; + BinaryRow read_partition_; + std::unordered_map file_bucket_path_mapping_; + std::unordered_map file_branch_mapping_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/chain_data_split_test.cpp b/src/paimon/core/table/source/chain_data_split_test.cpp new file mode 100644 index 000000000..82f27d451 --- /dev/null +++ b/src/paimon/core/table/source/chain_data_split_test.cpp @@ -0,0 +1,223 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/common/utils/serialization_utils.h" +#include "paimon/core/io/chain_data_file_path_factory.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_file_meta_serializer.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/table/source/chain_data_split_impl.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/deletion_file.h" +#include "paimon/data/timestamp.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/table/source/data_split.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +namespace { +constexpr const char* kFileName = "data-b446f78a-2cfb-4b3b-add8-31295d24a277-0.parquet"; +constexpr const char* kBucketPath = "data/parquet/append_09.db/append_09/f1=20/bucket-0"; +constexpr const char* kBranch = "delta"; +constexpr const char* kFallbackBucketPath = "data/parquet/append_09.db/append_09/f1=10/bucket-1"; +constexpr const char* kSingleSplitBucketPath = + "data/parquet/append_table_with_append_pt_branch.db/append_table_with_append_pt_branch/" + "pt=2/bucket-0"; +constexpr const char* kSingleSplitFileName0 = "data-39204ff8-55b2-497b-8e87-a1c736799eab-0.parquet"; +constexpr const char* kSingleSplitFileName1 = "data-625b3277-84d3-4320-80b9-89a5075bf5fd-0.parquet"; + +std::shared_ptr CreateDataFileMeta( + const std::string& file_name, std::optional external_path = std::nullopt) { + return std::make_shared( + file_name, /*file_size=*/1024, /*row_count=*/7, BinaryRow::EmptyRow(), + BinaryRow::EmptyRow(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + DataFileMeta::DUMMY_LEVEL, std::vector>(), Timestamp(0, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, external_path, + /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); +} + +void WriteStringMap(MemorySegmentOutputStream* out, + const std::unordered_map& values) { + out->WriteValue(values.size()); + for (const auto& [key, value] : values) { + out->WriteString(key); + out->WriteString(value); + } +} + +std::string ToString(const MemorySegmentOutputStream& out, + const std::shared_ptr& pool) { + PAIMON_UNIQUE_PTR bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + return std::string(bytes->data(), bytes->size()); +} + +std::string SerializeVersion7DataSplit(const std::shared_ptr& split, + const std::shared_ptr& pool) { + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + out.WriteValue(DataSplitImpl::MAGIC); + out.WriteValue(7); + out.WriteValue(split->SnapshotId()); + EXPECT_OK(SerializationUtils::SerializeBinaryRow(split->Partition(), &out)); + out.WriteValue(split->Bucket()); + out.WriteString(split->BucketPath()); + + out.WriteValue(split->TotalBuckets().has_value()); + if (split->TotalBuckets().has_value()) { + out.WriteValue(split->TotalBuckets().value()); + } + + DataFileMetaSerializer serializer(pool); + EXPECT_OK(serializer.SerializeList(split->BeforeFiles(), &out)); + DeletionFile::SerializeList(split->BeforeDeletionFiles(), &out); + EXPECT_OK(serializer.SerializeList(split->DataFiles(), &out)); + DeletionFile::SerializeList(split->DeletionFiles(), &out); + out.WriteValue(split->IsStreaming()); + out.WriteValue(split->RawConvertible()); + return ToString(out, pool); +} + +std::shared_ptr CreateBaseSplit(const std::shared_ptr& pool) { + DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/118, + ChainDataSplitImpl::VIRTUAL_BUCKET_PATH, + {CreateDataFileMeta(kFileName)}); + return builder.WithSnapshot(42) + .WithTotalBuckets(256) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value(); +} + +std::string AppendChainDataSplitTail( + const std::string& base_bytes, const std::shared_ptr& pool, + const std::unordered_map& bucket_paths, + const std::unordered_map& branches) { + MemorySegmentOutputStream tail(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + tail.WriteValue(true); + EXPECT_OK(SerializationUtils::SerializeBinaryRow(BinaryRow::EmptyRow(), &tail)); + WriteStringMap(&tail, bucket_paths); + WriteStringMap(&tail, branches); + return base_bytes + ToString(tail, pool); +} + +std::string AppendMalformedChainDataSplitTail(const std::string& base_bytes, + const std::shared_ptr& pool) { + MemorySegmentOutputStream tail(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + tail.WriteValue(true); + return base_bytes + ToString(tail, pool); +} + +} // namespace + +TEST(ChainDataSplitTest, DeserializeChainDataSplitTail) { + auto pool = GetDefaultPool(); + auto base_split = CreateBaseSplit(pool); + ASSERT_OK_AND_ASSIGN(std::string base_bytes, Split::Serialize(base_split, pool)); + + std::string chain_bytes = AppendChainDataSplitTail(base_bytes, pool, {{kFileName, kBucketPath}}, + {{kFileName, kBranch}}); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr split, + Split::Deserialize(chain_bytes.data(), chain_bytes.size(), pool)); + auto chain_split = std::dynamic_pointer_cast(split); + ASSERT_TRUE(chain_split); + EXPECT_TRUE(chain_split->AllSnapshotSplit()); + EXPECT_EQ(chain_split->BucketPath(), ChainDataSplitImpl::VIRTUAL_BUCKET_PATH); + EXPECT_EQ(chain_split->FileBucketPathMapping().at(kFileName), kBucketPath); + EXPECT_EQ(chain_split->FileBranchMapping().at(kFileName), kBranch); +} + +TEST(ChainDataSplitTest, DeserializeVersion7SingleSplitWithOriginalBucketPath) { + auto pool = GetDefaultPool(); + DataSplitImpl::Builder builder( + BinaryRow::EmptyRow(), /*bucket=*/3, kSingleSplitBucketPath, + {CreateDataFileMeta(kSingleSplitFileName0), CreateDataFileMeta(kSingleSplitFileName1)}); + ASSERT_OK_AND_ASSIGN(auto base_split, builder.WithSnapshot(42) + .WithTotalBuckets(256) + .IsStreaming(false) + .RawConvertible(true) + .Build()); + std::string base_bytes = SerializeVersion7DataSplit(base_split, pool); + std::string split_bytes = AppendChainDataSplitTail( + base_bytes, pool, + {{kSingleSplitFileName0, kSingleSplitBucketPath}, + {kSingleSplitFileName1, kSingleSplitBucketPath}}, + {{kSingleSplitFileName0, "snapshot"}, {kSingleSplitFileName1, "snapshot"}}); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr split, + Split::Deserialize(split_bytes.data(), split_bytes.size(), pool)); + auto chain_split = std::dynamic_pointer_cast(split); + ASSERT_TRUE(chain_split); + EXPECT_TRUE(chain_split->AllSnapshotSplit()); + EXPECT_EQ(chain_split->BucketPath(), kSingleSplitBucketPath); + EXPECT_EQ(chain_split->FileBucketPathMapping().at(kSingleSplitFileName0), + kSingleSplitBucketPath); + EXPECT_EQ(chain_split->FileBucketPathMapping().at(kSingleSplitFileName1), + kSingleSplitBucketPath); + EXPECT_EQ(chain_split->FileBranchMapping().at(kSingleSplitFileName0), "snapshot"); + EXPECT_EQ(chain_split->FileBranchMapping().at(kSingleSplitFileName1), "snapshot"); +} + +TEST(ChainDataSplitTest, ChainDataFilePathFactoryUsesPerFileBucketPath) { + auto fallback = std::make_shared(); + ASSERT_OK(fallback->Init(kFallbackBucketPath, "parquet", "data-", + /*external_path_provider=*/nullptr)); + ChainDataFilePathFactory factory(fallback, {{kFileName, kBucketPath}}); + + auto file_meta = CreateDataFileMeta(kFileName); + EXPECT_EQ(factory.ToPath(file_meta), std::string(kBucketPath) + "/" + kFileName); +} + +TEST(ChainDataSplitTest, ChainDataFilePathFactoryPreservesExternalPath) { + auto fallback = std::make_shared(); + ASSERT_OK(fallback->Init(kFallbackBucketPath, "parquet", "data-", + /*external_path_provider=*/nullptr)); + ChainDataFilePathFactory factory(fallback, {{kFileName, kBucketPath}}); + + const std::string external_path = "hdfs://external/path/" + std::string(kFileName); + auto file_meta = CreateDataFileMeta(kFileName, external_path); + EXPECT_EQ(factory.ToPath(file_meta), external_path); +} + +TEST(ChainDataSplitTest, MalformedChainDataSplitTailReturnsContextualError) { + auto pool = GetDefaultPool(); + auto base_split = CreateBaseSplit(pool); + ASSERT_OK_AND_ASSIGN(std::string base_bytes, Split::Serialize(base_split, pool)); + + std::string chain_bytes = AppendMalformedChainDataSplitTail(base_bytes, pool); + + ASSERT_NOK_WITH_MSG(Split::Deserialize(chain_bytes.data(), chain_bytes.size(), pool), + "ChainDataSplit"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/source/data_split_impl.h b/src/paimon/core/table/source/data_split_impl.h index a90ca55dc..e8bb7ebd7 100644 --- a/src/paimon/core/table/source/data_split_impl.h +++ b/src/paimon/core/table/source/data_split_impl.h @@ -177,7 +177,7 @@ class DataSplitImpl : public DataSplit { std::string ToString() const; - private: + protected: DataSplitImpl(const BinaryRow& partition, int32_t bucket, const std::string& bucket_path, std::vector>&& data_files) : partition_(partition), @@ -185,11 +185,21 @@ class DataSplitImpl : public DataSplit { bucket_path_(bucket_path), data_files_(std::move(data_files)) {} + void CopyMetadataFrom(const DataSplitImpl& other) { + total_buckets_ = other.total_buckets_; + snapshot_id_ = other.snapshot_id_; + before_files_ = other.before_files_; + before_deletion_files_ = other.before_deletion_files_; + data_deletion_files_ = other.data_deletion_files_; + is_streaming_ = other.is_streaming_; + raw_convertible_ = other.raw_convertible_; + } + + private: Result> RawMergedRowCount(DeletionVector::Factory dv_factory) const; bool DataEvolutionRowCountAvailable() const; Result DataEvolutionMergedRowCount() const; - private: int64_t snapshot_id_ = 0; BinaryRow partition_ = BinaryRow::EmptyRow(); int32_t bucket_ = -1; diff --git a/src/paimon/core/table/source/split.cpp b/src/paimon/core/table/source/split.cpp index 2d188cd53..c3059796b 100644 --- a/src/paimon/core/table/source/split.cpp +++ b/src/paimon/core/table/source/split.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include #include #include "fmt/format.h" @@ -22,7 +23,9 @@ #include "paimon/common/memory/memory_segment_utils.h" #include "paimon/common/utils/serialization_utils.h" #include "paimon/core/global_index/indexed_split_impl.h" +#include "paimon/core/io/data_file_meta_first_row_id_legacy_serializer.h" #include "paimon/core/io/data_file_meta_serializer.h" +#include "paimon/core/table/source/chain_data_split_impl.h" #include "paimon/core/table/source/data_split_impl.h" #include "paimon/core/table/source/deletion_file.h" #include "paimon/core/table/source/fallback_data_split.h" @@ -37,6 +40,65 @@ namespace paimon { struct DataFileMeta; namespace { +Result>> ReadVersion7DataFileMetaList( + DataInputStream* in, const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(int32_t size, in->ReadValue()); + if (size < 0) { + return Status::Invalid(fmt::format("invalid data file meta list size: {}", size)); + } + + DataFileMetaFirstRowIdLegacySerializer legacy_serializer(pool); + DataFileMetaSerializer current_serializer(pool); + std::vector> result; + result.reserve(size); + for (int32_t i = 0; i < size; ++i) { + PAIMON_ASSIGN_OR_RAISE(int32_t row_size, in->ReadValue()); + if (row_size < BinaryRow::CalculateFixPartSizeInBytes(19)) { + return Status::Invalid( + fmt::format("invalid version 7 data file meta row size: {}", row_size)); + } + std::shared_ptr bytes = Bytes::AllocateBytes(row_size, pool.get()); + PAIMON_RETURN_NOT_OK(in->ReadBytes(bytes.get())); + + MemorySegment segment = MemorySegment::Wrap(bytes); + int64_t file_name_offset_and_size = + segment.GetValue(BinaryRow::CalculateBitSetWidthInBytes(/*arity=*/19)); + if ((file_name_offset_and_size & BinarySection::HIGHEST_FIRST_BIT) != 0) { + return Status::Invalid( + "cannot determine version 7 data file meta format from inline file name"); + } + int32_t variable_part_offset = static_cast(file_name_offset_and_size >> 32); + + int32_t arity = 0; + const ObjectSerializer>* serializer = nullptr; + if (variable_part_offset == BinaryRow::CalculateFixPartSizeInBytes(/*arity=*/19)) { + arity = 19; + serializer = &legacy_serializer; + } else if (variable_part_offset == BinaryRow::CalculateFixPartSizeInBytes(/*arity=*/20)) { + arity = 20; + serializer = ¤t_serializer; + } else { + return Status::Invalid(fmt::format( + "invalid version 7 data file meta variable part offset: {}", variable_part_offset)); + } + + BinaryRow row(arity); + row.PointTo(segment, /*offset=*/0, row_size); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr meta, serializer->FromRow(row)); + result.emplace_back(std::move(meta)); + } + return result; +} + +Result>> ReadDataFileMetaList( + int32_t version, const ObjectSerializer>* data_file_serializer, + DataInputStream* in, const std::shared_ptr& pool) { + if (version == 7) { + return ReadVersion7DataFileMetaList(in, pool); + } + return data_file_serializer->DeserializeList(in); +} + Status WriteDataSplit(const std::shared_ptr& data_split_impl, MemorySegmentOutputStream* out, const std::shared_ptr& pool) { out->WriteValue(DataSplitImpl::MAGIC); @@ -98,13 +160,15 @@ Result> ReadDataSplitWithoutMagicNumber( std::unique_ptr>> data_file_serializer, DataSplitImpl::GetFileMetaSerializer(version, pool)); std::vector> before_files; - PAIMON_ASSIGN_OR_RAISE(before_files, data_file_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(before_files, + ReadDataFileMetaList(version, data_file_serializer.get(), in, pool)); // compatible for deletion file std::vector> before_deletion_files; PAIMON_ASSIGN_OR_RAISE(before_deletion_files, DeletionFile::DeserializeList(in, version)); std::vector> data_files; - PAIMON_ASSIGN_OR_RAISE(data_files, data_file_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(data_files, + ReadDataFileMetaList(version, data_file_serializer.get(), in, pool)); // compatible for deletion file std::vector> data_deletion_files; PAIMON_ASSIGN_OR_RAISE(data_deletion_files, DeletionFile::DeserializeList(in, version)); @@ -129,6 +193,44 @@ Result> ReadDataSplitWithoutMagicNumber( return builder.Build(); } +Result> ReadStringMap(DataInputStream* in) { + PAIMON_ASSIGN_OR_RAISE(int32_t size, in->ReadValue()); + if (size < 0) { + return Status::Invalid(fmt::format("invalid string map size: {}", size)); + } + + std::unordered_map result; + result.reserve(size); + for (int32_t i = 0; i < size; ++i) { + PAIMON_ASSIGN_OR_RAISE(std::string key, in->ReadString()); + PAIMON_ASSIGN_OR_RAISE(std::string value, in->ReadString()); + result.emplace(std::move(key), std::move(value)); + } + return result; +} + +Result> ReadChainDataSplitTail( + const std::shared_ptr& base_split, DataInputStream* in, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(bool all_snapshot_split, in->ReadValue()); + PAIMON_ASSIGN_OR_RAISE(BinaryRow read_partition, + SerializationUtils::DeserializeBinaryRow(in, pool.get())); + PAIMON_ASSIGN_OR_RAISE(auto file_bucket_path_mapping, ReadStringMap(in)); + PAIMON_ASSIGN_OR_RAISE(auto file_branch_mapping, ReadStringMap(in)); + + PAIMON_ASSIGN_OR_RAISE(int64_t pos, in->GetPos()); + PAIMON_ASSIGN_OR_RAISE(int64_t stream_length, in->Length()); + if (pos != stream_length) { + return Status::Invalid(fmt::format( + "invalid ChainDataSplit byte stream, remaining {} bytes after deserializing", + stream_length - pos)); + } + + return std::make_shared(base_split, all_snapshot_split, read_partition, + std::move(file_bucket_path_mapping), + std::move(file_branch_mapping)); +} + } // namespace Result Split::Serialize(const std::shared_ptr& split, @@ -224,13 +326,23 @@ Result> Split::Deserialize(const char* buffer, size_t len PAIMON_ASSIGN_OR_RAISE(int64_t stream_length, in.Length()); if (pos == stream_length) { return data_split; + } else if (data_split->BucketPath() == ChainDataSplitImpl::VIRTUAL_BUCKET_PATH) { + auto chain_split = ReadChainDataSplitTail(data_split, &in, pool); + if (!chain_split.ok()) { + return Status::Invalid(fmt::format("invalid ChainDataSplit byte stream: {}", + chain_split.status().ToString())); + } + return std::static_pointer_cast(chain_split.value()); } else if (pos == stream_length - 1) { PAIMON_ASSIGN_OR_RAISE(bool is_fallback, in.ReadValue()); return std::make_shared(data_split, is_fallback); } else { - return Status::Invalid(fmt::format( - "invalid data split byte stream, remaining {} bytes after deserializing", - stream_length - pos)); + auto chain_split = ReadChainDataSplitTail(data_split, &in, pool); + if (!chain_split.ok()) { + return Status::Invalid(fmt::format("invalid ChainDataSplit byte stream: {}", + chain_split.status().ToString())); + } + return std::static_pointer_cast(chain_split.value()); } } return Status::Invalid("invalid split, must be DataSplit or IndexedSplit");