diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 3f99662aa..171502865 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -736,6 +736,7 @@ if(PAIMON_BUILD_TESTS) core/table/source/data_split_test.cpp core/table/source/deletion_file_test.cpp core/table/source/split_generator_test.cpp + core/table/source/snapshot/snapshot_reader_test.cpp core/table/source/startup_mode_test.cpp core/table/source/table_scan_test.cpp core/table/system/system_table_test.cpp diff --git a/src/paimon/core/deletionvectors/deletion_file_writer.cpp b/src/paimon/core/deletionvectors/deletion_file_writer.cpp index 12515fd41..9dc4dd614 100644 --- a/src/paimon/core/deletionvectors/deletion_file_writer.cpp +++ b/src/paimon/core/deletionvectors/deletion_file_writer.cpp @@ -42,8 +42,8 @@ Status DeletionFileWriter::Write(const std::string& key, } DataOutputStream output_stream(out_); PAIMON_ASSIGN_OR_RAISE(int32_t length, deletion_vector->SerializeTo(pool_, &output_stream)); - dv_metas_.insert(key, DeletionVectorMeta(key, static_cast(start), length, - deletion_vector->GetCardinality())); + dv_metas_.insert_or_assign(key, DeletionVectorMeta(key, static_cast(start), length, + deletion_vector->GetCardinality())); return Status::OK(); } diff --git a/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp b/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp index 5c0f0382f..1226573ac 100644 --- a/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp +++ b/src/paimon/core/deletionvectors/deletion_file_writer_test.cpp @@ -95,6 +95,39 @@ TEST(DeletionFileWriterTest, GetResultWithoutCloseShouldFail) { ASSERT_NOK_WITH_MSG(writer->GetResult(), "Deletion file result length -1 out of int32 range"); } +TEST(DeletionFileWriterTest, WriteOverwritesDuplicateDataFileName) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + ASSERT_OK_AND_ASSIGN(auto writer, DeletionFileWriter::Create(path_factory, fs, pool)); + + RoaringBitmap32 roaring_1; + roaring_1.Add(1); + auto dv_1 = std::make_shared(roaring_1); + + RoaringBitmap32 roaring_2; + roaring_2.Add(2); + roaring_2.Add(3); + auto dv_2 = std::make_shared(roaring_2); + + ASSERT_OK(writer->Write("data-file-1", dv_1)); + ASSERT_OK(writer->Write("data-file-1", dv_2)); + ASSERT_OK(writer->Close()); + + ASSERT_OK_AND_ASSIGN(auto meta, writer->GetResult()); + const auto& dv_ranges = meta->DvRanges(); + ASSERT_TRUE(dv_ranges.has_value()); + ASSERT_EQ(dv_ranges->size(), 1); + + auto iter = dv_ranges->find("data-file-1"); + ASSERT_NE(iter, dv_ranges->end()); + ASSERT_GT(iter->second.GetOffset(), 1); + ASSERT_EQ(iter->second.GetCardinality(), std::optional(2)); +} + TEST(DeletionFileWriterTest, ExternalPathInResult) { auto dir = UniqueTestDirectory::Create(); ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, diff --git a/src/paimon/core/deletionvectors/deletion_vector.cpp b/src/paimon/core/deletionvectors/deletion_vector.cpp index 9ea909d39..51641887e 100644 --- a/src/paimon/core/deletionvectors/deletion_vector.cpp +++ b/src/paimon/core/deletionvectors/deletion_vector.cpp @@ -70,7 +70,7 @@ std::unordered_map DeletionVector::CreateDeletionFile assert(deletion_files.size() == data_files.size()); for (size_t i = 0; i < deletion_files.size(); i++) { if (deletion_files[i] != std::nullopt) { - deletion_file_map.emplace(data_files[i]->file_name, deletion_files[i].value()); + deletion_file_map.insert_or_assign(data_files[i]->file_name, deletion_files[i].value()); } } return deletion_file_map; diff --git a/src/paimon/core/deletionvectors/deletion_vector_test.cpp b/src/paimon/core/deletionvectors/deletion_vector_test.cpp index d7c1bdbcc..eb75d5ad0 100644 --- a/src/paimon/core/deletionvectors/deletion_vector_test.cpp +++ b/src/paimon/core/deletionvectors/deletion_vector_test.cpp @@ -193,6 +193,18 @@ TEST(DeletionVectorTest, CreateDeletionFileMap) { ASSERT_EQ(deletion_file_map.at("file-0.orc"), deletion_file_0); ASSERT_EQ(deletion_file_map.count("file-1.orc"), 0); ASSERT_EQ(deletion_file_map.at("file-2.orc"), deletion_file_2); + + DeletionFile deletion_file_0_new("dv-0-new", /*offset=*/50, /*length=*/60, + /*cardinality=*/7); + std::vector> duplicate_data_files = { + CreateDataFileMeta("file-0.orc"), CreateDataFileMeta("file-0.orc")}; + std::vector> duplicate_deletion_files = {deletion_file_0, + deletion_file_0_new}; + + auto duplicate_deletion_file_map = + DeletionVector::CreateDeletionFileMap(duplicate_data_files, duplicate_deletion_files); + ASSERT_EQ(duplicate_deletion_file_map.size(), 1); + ASSERT_EQ(duplicate_deletion_file_map.at("file-0.orc"), deletion_file_0_new); } } // namespace paimon::test diff --git a/src/paimon/core/manifest/index_manifest_file_handler.cpp b/src/paimon/core/manifest/index_manifest_file_handler.cpp index fd5808ddf..d249775aa 100644 --- a/src/paimon/core/manifest/index_manifest_file_handler.cpp +++ b/src/paimon/core/manifest/index_manifest_file_handler.cpp @@ -30,33 +30,31 @@ std::vector IndexManifestFileHandler::BucketedCombiner::Comb const std::vector& new_index_files) const { std::unordered_map index_entries; for (const auto& entry : prev_index_files) { - index_entries.emplace( + index_entries.insert_or_assign( BucketIdentifier(entry.partition, entry.bucket, entry.index_file->IndexType()), entry); } - std::unordered_map removed; + std::vector removed; removed.reserve(new_index_files.size()); - std::unordered_map added; + std::vector added; added.reserve(new_index_files.size()); for (const auto& entry : new_index_files) { if (entry.kind == FileKind::Delete()) { - removed.emplace( - BucketIdentifier(entry.partition, entry.bucket, entry.index_file->IndexType()), - entry); + removed.push_back(entry); } else if (entry.kind == FileKind::Add()) { - added.emplace( - BucketIdentifier(entry.partition, entry.bucket, entry.index_file->IndexType()), - entry); + added.push_back(entry); } } // The deleted entry is processed first to avoid overwriting a new entry. for (const auto& entry : removed) { - index_entries.erase(entry.first); + index_entries.erase( + BucketIdentifier(entry.partition, entry.bucket, entry.index_file->IndexType())); } for (const auto& entry : added) { - index_entries.emplace(entry.first, entry.second); + index_entries.insert_or_assign( + BucketIdentifier(entry.partition, entry.bucket, entry.index_file->IndexType()), entry); } std::vector result_entries; @@ -72,7 +70,7 @@ std::vector IndexManifestFileHandler::GlobalFileNameCombiner const std::vector& new_index_files) const { std::map index_entries; for (const auto& entry : prev_index_files) { - index_entries.emplace(entry.index_file->FileName(), entry); + index_entries.insert_or_assign(entry.index_file->FileName(), entry); } std::vector removed; @@ -93,7 +91,7 @@ std::vector IndexManifestFileHandler::GlobalFileNameCombiner index_entries.erase(entry.index_file->FileName()); } for (const auto& entry : added) { - index_entries.emplace(entry.index_file->FileName(), entry); + index_entries.insert_or_assign(entry.index_file->FileName(), entry); } std::vector result_entries; diff --git a/src/paimon/core/manifest/index_manifest_file_handler.h b/src/paimon/core/manifest/index_manifest_file_handler.h index 46828c8de..b7df8ed44 100644 --- a/src/paimon/core/manifest/index_manifest_file_handler.h +++ b/src/paimon/core/manifest/index_manifest_file_handler.h @@ -46,7 +46,7 @@ class IndexManifestFileHandler { const std::vector& new_index_files) const = 0; }; - /// Combine previous and new global index files by file `BucketIdentifier`. + /// Combine previous and new index files by partition, bucket and index type. class BucketedCombiner : public IndexManifestFileCombiner { public: std::vector Combine( @@ -54,7 +54,7 @@ class IndexManifestFileHandler { const std::vector& new_index_files) const override; }; - /// Combine previous and new global index files by file name. + /// Combine previous and new index files by file name. class GlobalFileNameCombiner : public IndexManifestFileCombiner { public: std::vector Combine( diff --git a/src/paimon/core/manifest/index_manifest_file_handler_test.cpp b/src/paimon/core/manifest/index_manifest_file_handler_test.cpp index 467fd952d..4a5d03a83 100644 --- a/src/paimon/core/manifest/index_manifest_file_handler_test.cpp +++ b/src/paimon/core/manifest/index_manifest_file_handler_test.cpp @@ -74,6 +74,25 @@ class IndexManifestFileHandlerTest : public testing::Test { /*external_path=*/std::nullopt)); } + static IndexManifestEntry MakeDvEntry(const FileKind& kind, const BinaryRow& partition, + int32_t bucket, const std::string& file_name, + const std::vector& data_file_names, + int64_t row_count) { + LinkedHashMap dv_ranges; + int32_t offset = 0; + for (const auto& data_file_name : data_file_names) { + dv_ranges.insert(data_file_name, + DeletionVectorMeta(data_file_name, offset, /*length=*/10, + /*cardinality=*/std::nullopt)); + offset += 10; + } + return IndexManifestEntry( + kind, partition, bucket, + std::make_shared(DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, + file_name, /*file_size=*/row_count * 10, row_count, + dv_ranges, /*external_path=*/std::nullopt)); + } + std::shared_ptr pool_; std::unique_ptr dir_; }; @@ -154,13 +173,55 @@ TEST_F(IndexManifestFileHandlerTest, BucketedCombinerUsesPartitionBucketAndIndex ASSERT_TRUE(found_bucket1); } +TEST_F(IndexManifestFileHandlerTest, BucketedCombinerOverwritesDuplicateAddedEntries) { + ASSERT_OK_AND_ASSIGN(auto index_manifest_file, CreateManifestFile(/*bucket_mode=*/2)); + + auto partition = BinaryRowGenerator::GenerateRow({10}, pool_.get()); + std::vector new_entries = { + MakeEntry(FileKind::Add(), partition, /*bucket=*/0, + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "dv-0-old", 10), + MakeEntry(FileKind::Add(), partition, /*bucket=*/0, + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "dv-0-new", 20)}; + + ASSERT_OK_AND_ASSIGN(std::string current_manifest, + IndexManifestFileHandler::Write( + /*previous_index_manifest=*/std::nullopt, new_entries, + /*bucket_mode=*/2, index_manifest_file.get())); + + std::vector written_entries; + ASSERT_OK(index_manifest_file->Read(current_manifest, /*filter=*/nullptr, &written_entries)); + ASSERT_EQ(written_entries.size(), 1); + ASSERT_EQ(written_entries[0].index_file->FileName(), "dv-0-new"); + ASSERT_EQ(written_entries[0].index_file->RowCount(), 20); +} + +TEST_F(IndexManifestFileHandlerTest, GlobalCombinerOverwritesDuplicateAddedEntries) { + ASSERT_OK_AND_ASSIGN(auto index_manifest_file, CreateManifestFile(/*bucket_mode=*/4)); + + auto partition = BinaryRow::EmptyRow(); + std::vector new_entries = { + MakeEntry(FileKind::Add(), partition, /*bucket=*/0, /*index_type=*/"BTREE", "global-0", 10), + MakeEntry(FileKind::Add(), partition, /*bucket=*/0, /*index_type=*/"BTREE", "global-0", + 20)}; + + ASSERT_OK_AND_ASSIGN(std::string current_manifest, + IndexManifestFileHandler::Write( + /*previous_index_manifest=*/std::nullopt, new_entries, + /*bucket_mode=*/4, index_manifest_file.get())); + + std::vector written_entries; + ASSERT_OK(index_manifest_file->Read(current_manifest, /*filter=*/nullptr, &written_entries)); + ASSERT_EQ(written_entries.size(), 1); + ASSERT_EQ(written_entries[0].index_file->FileName(), "global-0"); + ASSERT_EQ(written_entries[0].index_file->RowCount(), 20); +} + TEST_F(IndexManifestFileHandlerTest, DvWithBucketUnawareModeReturnsNotImplemented) { ASSERT_OK_AND_ASSIGN(auto index_manifest_file, CreateManifestFile(/*bucket_mode=*/-1)); auto partition = BinaryRow::EmptyRow(); std::vector new_entries = { - MakeEntry(FileKind::Add(), partition, /*bucket=*/0, - DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "dv-0", 1)}; + MakeDvEntry(FileKind::Add(), partition, /*bucket=*/0, "dv-0", {"data-0.orc"}, 1)}; ASSERT_NOK_WITH_MSG(IndexManifestFileHandler::Write( /*previous_index_manifest=*/std::nullopt, new_entries, diff --git a/src/paimon/core/table/source/snapshot/snapshot_reader.cpp b/src/paimon/core/table/source/snapshot/snapshot_reader.cpp index a0d8e420a..1851a951e 100644 --- a/src/paimon/core/table/source/snapshot/snapshot_reader.cpp +++ b/src/paimon/core/table/source/snapshot/snapshot_reader.cpp @@ -120,8 +120,8 @@ Result>> SnapshotReader::GetDeletionFile if (dv_metas != std::nullopt) { for (const auto& dv_meta_iter : dv_metas.value()) { const auto& dv_meta = dv_meta_iter.second; - data_file_to_index_file_meta.insert( - std::make_pair(dv_meta.GetDataFileName(), index_file_meta)); + data_file_to_index_file_meta.insert_or_assign(dv_meta.GetDataFileName(), + index_file_meta); } } } diff --git a/src/paimon/core/table/source/snapshot/snapshot_reader_test.cpp b/src/paimon/core/table/source/snapshot/snapshot_reader_test.cpp new file mode 100644 index 000000000..4d57e21d6 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/snapshot_reader_test.cpp @@ -0,0 +1,112 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/source/snapshot/snapshot_reader.h" + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/index/index_file_handler.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/index_file_path_factories.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class SnapshotReaderTest : public testing::Test { + protected: + void SetUp() override { + pool_ = GetDefaultPool(); + dir_ = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir_ != nullptr); + } + + std::shared_ptr CreateDataFileMeta(const std::string& file_name) const { + return std::make_shared( + file_name, /*file_size=*/100, /*row_count=*/10, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + DataFileMeta::DUMMY_LEVEL, std::vector>{}, Timestamp(0, 0), + std::nullopt, nullptr, FileSource::Append(), std::nullopt, std::nullopt, std::nullopt, + std::nullopt); + } + + std::shared_ptr CreateIndexFileMeta(const std::string& index_file_name, + const std::string& data_file_name, + int64_t offset, int64_t length, + std::optional cardinality) const { + LinkedHashMap dv_ranges; + dv_ranges.insert(data_file_name, + DeletionVectorMeta(data_file_name, offset, length, cardinality)); + return std::make_shared(DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, + index_file_name, /*file_size=*/100, /*row_count=*/10, + dv_ranges, /*external_path=*/std::nullopt); + } + + Result> CreateIndexFileHandler() const { + auto schema = arrow::schema({arrow::field("f0", arrow::int32())}); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + dir_->Str(), schema, /*partition_keys=*/{}, /*default_part_value=*/"", "orc", + /*data_file_prefix=*/"data-", /*legacy_partition_name_enabled=*/true, + /*external_paths=*/{}, /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, pool_)); + auto path_factories = std::make_shared(path_factory); + return std::make_unique(std::make_shared(), + std::unique_ptr(), + path_factories, /*dv_bitmap64=*/false, pool_); + } + + std::shared_ptr pool_; + std::unique_ptr dir_; +}; + +TEST_F(SnapshotReaderTest, GetDeletionFilesOverwritesDuplicateDataFileName) { + ASSERT_OK_AND_ASSIGN(std::unique_ptr index_file_handler, + CreateIndexFileHandler()); + SnapshotReader snapshot_reader(/*scan=*/nullptr, /*path_factory=*/nullptr, + /*split_generator=*/nullptr, std::move(index_file_handler)); + + const std::string data_file_name = "data-0.orc"; + std::vector> data_files = {CreateDataFileMeta(data_file_name)}; + std::vector> index_file_metas = { + CreateIndexFileMeta("index-first", data_file_name, /*offset=*/1, /*length=*/11, + /*cardinality=*/3), + CreateIndexFileMeta("index-second", data_file_name, /*offset=*/2, /*length=*/22, + /*cardinality=*/4)}; + + ASSERT_OK_AND_ASSIGN(std::vector> deletion_files, + snapshot_reader.GetDeletionFiles(BinaryRow::EmptyRow(), /*bucket=*/0, + data_files, index_file_metas)); + + ASSERT_EQ(deletion_files.size(), 1); + ASSERT_TRUE(deletion_files[0].has_value()); + EXPECT_EQ(deletion_files[0]->path, dir_->Str() + "/index/index-second"); + EXPECT_EQ(deletion_files[0]->offset, 2); + EXPECT_EQ(deletion_files[0]->length, 22); + EXPECT_EQ(deletion_files[0]->cardinality, std::optional(4)); +} + +} // namespace paimon::test