diff --git a/src/paimon/core/manifest/index_manifest_file_handler.cpp b/src/paimon/core/manifest/index_manifest_file_handler.cpp index fd5808ddf..ab118b3af 100644 --- a/src/paimon/core/manifest/index_manifest_file_handler.cpp +++ b/src/paimon/core/manifest/index_manifest_file_handler.cpp @@ -56,7 +56,7 @@ std::vector IndexManifestFileHandler::BucketedCombiner::Comb index_entries.erase(entry.first); } for (const auto& entry : added) { - index_entries.emplace(entry.first, entry.second); + index_entries.insert_or_assign(entry.first, entry.second); } std::vector result_entries; diff --git a/src/paimon/core/schema/schema_validation_test.cpp b/src/paimon/core/schema/schema_validation_test.cpp index f9043165e..5f12e09a0 100644 --- a/src/paimon/core/schema/schema_validation_test.cpp +++ b/src/paimon/core/schema/schema_validation_test.cpp @@ -910,6 +910,26 @@ TEST(SchemaValidationTest, TestMapStorageLayout) { ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), "not MAP"); } + // Invalid: nested MAP paths are not shared-shredding columns; only top-level columns are + // addressable by fields..map.storage-layout. + { + auto payload = arrow::field( + "payload", + arrow::struct_({arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int64()))})); + arrow::FieldVector fields = {f0, f1, payload}; + auto schema = arrow::schema(fields); + std::map options = { + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {"fields.payload.attrs.map.storage-layout", "shared-shredding"}}; + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f0", "f1"}, options)); + ASSERT_NOK_WITH_MSG( + SchemaValidation::ValidateTableSchema(*table_schema), + "Column 'payload.attrs' is configured with map.storage-layout but does not exist in " + "table schema."); + } // Valid: default layout on a MAP column { arrow::FieldVector fields = {f0, f1, f2}; diff --git a/test/inte/append_compaction_inte_test.cpp b/test/inte/append_compaction_inte_test.cpp index a25062809..c24d768b6 100644 --- a/test/inte/append_compaction_inte_test.cpp +++ b/test/inte/append_compaction_inte_test.cpp @@ -19,11 +19,14 @@ #include #include +#include "arrow/api.h" #include "arrow/c/bridge.h" #include "gtest/gtest.h" #include "paimon/commit_context.h" #include "paimon/common/data/binary_row.h" +#include "paimon/common/data/shredding/map_shared_shredding_utils.h" #include "paimon/common/factories/io_hook.h" +#include "paimon/common/utils/path_util.h" #include "paimon/common/utils/scope_guard.h" #include "paimon/core/append/bucketed_append_compact_manager.h" #include "paimon/core/io/data_file_meta.h" @@ -34,6 +37,7 @@ #include "paimon/executor.h" #include "paimon/file_store_commit.h" #include "paimon/file_store_write.h" +#include "paimon/format/file_format_factory.h" #include "paimon/result.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/data_generator.h" @@ -245,6 +249,118 @@ TEST_P(AppendCompactionInteTest, TestAppendTableStreamWriteFullCompaction) { } } +TEST_P(AppendCompactionInteTest, TestAppendTableStreamWriteFullCompactionWithMapSharedShredding) { + auto file_format = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("tags", map_type), + }; + auto schema = arrow::schema(fields); + + std::map options = { + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "1"}, + {Options::BUCKET_KEY, "id"}, + {Options::FILE_SYSTEM, "local"}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "64"}, + }; + ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, + /*is_streaming_mode=*/true)); + + ASSERT_OK_AND_ASSIGN(auto batch_0, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [1, [["a", 10], ["b", 20]]], + [2, [["c", 30]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + int64_t commit_identifier = 0; + ASSERT_OK(helper->WriteAndCommit(std::move(batch_0), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK_AND_ASSIGN(auto batch_1, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [3, [["a", 40], ["d", 50]]], + [4, null] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_1), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK_AND_ASSIGN(auto batch_2, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [5, [["e", 60], ["f", 70], ["g", 80], ["h", 90]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_2), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK(helper->write_->Compact(/*partition=*/{}, /*bucket=*/0, + /*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN( + std::vector> commit_messages, + helper->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier)); + ASSERT_FALSE(commit_messages.empty()); + ASSERT_OK(helper->commit_->Commit(commit_messages, commit_identifier)); + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(Snapshot::CommitKind::Compact(), snapshot.value().GetCommitKind()); + + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_EQ(data_splits.size(), 1); + { + // check adaptive k + auto data_split = std::dynamic_pointer_cast(data_splits[0]); + ASSERT_TRUE(data_split); + ASSERT_EQ(data_split->DataFiles().size(), 1); + auto compact_file = data_split->DataFiles()[0]; + std::string compact_file_path = + PathUtil::JoinPath(data_split->BucketPath(), compact_file->file_name); + ASSERT_OK_AND_ASSIGN(auto unique_input_stream, + dir->GetFileSystem()->Open(compact_file_path)); + std::shared_ptr input_stream(std::move(unique_input_stream)); + ASSERT_OK_AND_ASSIGN(auto file_format_obj, FileFormatFactory::Get(file_format, options)); + ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format_obj->CreateReaderBuilder(10)); + ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(input_stream)); + ASSERT_OK_AND_ASSIGN(auto c_file_schema, reader->GetFileSchema()); + auto file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie(); + auto tags_field = file_schema->GetFieldByName("tags"); + ASSERT_TRUE(tags_field); + ASSERT_TRUE(tags_field->metadata()); + ASSERT_OK_AND_ASSIGN( + auto tags_meta, + MapSharedShreddingUtils::DeserializeMetadata( + tags_field->metadata()->Copy(), MapSharedShreddingDefine::kDefaultDictCompression)); + ASSERT_EQ(4, tags_meta.num_columns); + ASSERT_EQ(4, tags_meta.max_row_width); + } + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits, + R"([ + [0, 1, [["a", 10], ["b", 20]]], + [0, 2, [["c", 30]]], + [0, 3, [["a", 40], ["d", 50]]], + [0, 4, null], + [0, 5, [["e", 60], ["f", 70], ["g", 80], ["h", 90]]] + ])")); + ASSERT_TRUE(success); +} + TEST_P(AppendCompactionInteTest, TestAppendTableStreamWriteFullCompactionWithDv) { auto dir = UniqueTestDirectory::Create(); ASSERT_TRUE(dir); diff --git a/test/inte/data_evolution_table_test.cpp b/test/inte/data_evolution_table_test.cpp index 4d0cbaf7e..83e0c232b 100644 --- a/test/inte/data_evolution_table_test.cpp +++ b/test/inte/data_evolution_table_test.cpp @@ -17,6 +17,7 @@ #include "gtest/gtest.h" #include "paimon/common/factories/io_hook.h" #include "paimon/common/table/special_fields.h" +#include "paimon/common/types/data_field.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/scope_guard.h" @@ -46,9 +47,10 @@ class DataEvolutionTableTest : public ::testing::Test, dir_.reset(); } - void CreateTable(const std::vector& partition_keys, + void CreateTable(const arrow::FieldVector& fields, + const std::vector& partition_keys, const std::map& options) const { - auto schema = arrow::schema(fields_); + auto schema = arrow::schema(fields); ::ArrowSchema c_schema; ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); @@ -59,6 +61,11 @@ class DataEvolutionTableTest : public ::testing::Test, /*ignore_if_exists=*/false)); } + void CreateTable(const std::vector& partition_keys, + const std::map& options) const { + CreateTable(fields_, partition_keys, options); + } + void CreateTable(const std::vector& partition_keys) const { std::map options = {{Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, GetParam()}, @@ -510,6 +517,149 @@ TEST_P(DataEvolutionTableTest, TestOnlySomeColumns) { } } +TEST_P(DataEvolutionTableTest, TestMultipleSharedShreddingMapsPartialOverwrite) { + if (GetParam() != "parquet" && GetParam() != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("map1", map_type), + arrow::field("map2", map_type), + }; + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, GetParam()}, + {Options::FILE_SYSTEM, "local"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {"fields.map1.map.storage-layout", "shared-shredding"}, + {"fields.map1.map.shared-shredding.max-columns", "1"}, + {"fields.map2.map.storage-layout", "shared-shredding"}, + {"fields.map2.map.shared-shredding.max-columns", "1"}, + }; + CreateTable(fields, /*partition_keys=*/{}, options); + std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + auto schema = arrow::schema(fields); + + std::vector write_cols0 = {"id", "map1"}; + auto src_array0 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0], fields[1]}), R"([ + [1, [["a", 10], ["b", 20]]], + [11, [["a", 11], ["b", 21]]] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(auto commit_msgs0, WriteArray(table_path, write_cols0, src_array0)); + ASSERT_OK(Commit(table_path, commit_msgs0)); + + std::vector write_cols1 = {"id", "map2"}; + auto src_array1 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0], fields[2]}), R"([ + [2, [["c", 30], ["d", 40]]], + [12, [["c", 31], ["d", 41]]] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(auto commit_msgs1, WriteArray(table_path, write_cols1, src_array1)); + SetFirstRowId(/*reset_first_row_id=*/0, commit_msgs1); + ASSERT_OK(Commit(table_path, commit_msgs1)); + + std::vector write_cols2 = {"map1"}; + auto src_array2 = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[1]}), R"([ + [[["b", 200], ["a", 100]]], + [[["b", 201], ["a", 101]]] + ])") + .ValueOrDie()); + ASSERT_OK_AND_ASSIGN(auto commit_msgs2, WriteArray(table_path, write_cols2, src_array2)); + SetFirstRowId(/*reset_first_row_id=*/0, commit_msgs2); + ASSERT_OK(Commit(table_path, commit_msgs2)); + + // Read all columns and merge values from all partial files. + auto expected_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [2, [["a", 100], ["b", 200]], [["c", 30], ["d", 40]]], + [12, [["a", 101], ["b", 201]], [["c", 31], ["d", 41]]] + ])") + .ValueOrDie()); + ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array)); + + // Read a subset of columns and recall only the requested shared-shredding MAP column. + auto expected_column_pruned_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0], fields[2]}), R"([ + [2, [["c", 30], ["d", 40]]], + [12, [["c", 31], ["d", 41]]] + ])") + .ValueOrDie()); + ASSERT_OK(ScanAndRead(table_path, {"id", "map2"}, expected_column_pruned_array)); + + // Read selected keys from both shared-shredding MAP columns after partial overwrite merge. + { + auto map1_selected_keys = + arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"b"}); + auto map2_selected_keys = + arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"d"}); + auto read_schema = arrow::schema({ + fields[0], + fields[1]->WithMetadata(map1_selected_keys), + fields[2]->WithMetadata(map2_selected_keys), + }); + auto c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok()); + + ScanContextBuilder scan_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetReadSchema(std::move(c_schema)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr read_context, + read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto actual, ReadResultCollector::CollectResult(batch_reader.get())); + + auto expected_type = arrow::struct_({ + SpecialFields::ValueKind().field_, + fields[0], + fields[1], + fields[2], + }); + auto expected = arrow::ipc::internal::json::ArrayFromJSON(expected_type, R"([ + [0, 2, [["b", 200]], [["d", 40]]], + [0, 12, [["b", 201]], [["d", 41]]] + ])") + .ValueOrDie(); + auto expected_chunked = std::make_shared(expected); + ASSERT_TRUE(expected_chunked->Equals(actual)) + << "actual=" << actual->ToString() << "\nexpected=" << expected_chunked->ToString(); + } + + // Read a subset of rows after merging values from all partial files. + auto expected_partial_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [12, [["a", 101], ["b", 201]], [["c", 31], ["d", 41]]] + ])") + .ValueOrDie()); + ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_partial_array, + /*predicate=*/nullptr, + /*row_ranges=*/{Range(1l, 1l)})); + + // Read row tracking fields and verify the latest partial overwrite sequence number. + auto expected_row_tracking_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON( + arrow::struct_({fields[0], fields[1], fields[2], SpecialFields::RowId().field_, + SpecialFields::SequenceNumber().field_}), + R"([ + [2, [["a", 100], ["b", 200]], [["c", 30], ["d", 40]], 0, 3], + [12, [["a", 101], ["b", 201]], [["c", 31], ["d", 41]], 1, 3] + ])") + .ValueOrDie()); + ASSERT_OK(ScanAndRead(table_path, {"id", "map1", "map2", "_ROW_ID", "_SEQUENCE_NUMBER"}, + expected_row_tracking_array)); +} + TEST_P(DataEvolutionTableTest, TestNullValues) { CreateTable(); std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); diff --git a/test/inte/nested_column_pruning_inte_test.cpp b/test/inte/nested_column_pruning_inte_test.cpp index 45ff02d36..abc2eae95 100644 --- a/test/inte/nested_column_pruning_inte_test.cpp +++ b/test/inte/nested_column_pruning_inte_test.cpp @@ -26,17 +26,21 @@ #include "arrow/c/bridge.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" +#include "paimon/commit_context.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/string_utils.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/schema/table_schema.h" #include "paimon/defs.h" +#include "paimon/file_store_commit.h" +#include "paimon/file_store_write.h" #include "paimon/fs/file_system_factory.h" #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/read_context.h" #include "paimon/reader/batch_reader.h" +#include "paimon/record_batch.h" #include "paimon/result.h" #include "paimon/scan_context.h" #include "paimon/status.h" @@ -47,6 +51,7 @@ #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/test_helper.h" #include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" namespace paimon { class DataSplit; @@ -80,6 +85,40 @@ class NestedColumnPruningInteTest : public ::testing::Test, ASSERT_TRUE(is_equal); } + void ScanReadAndCheck(const std::string& table_path, + const std::shared_ptr& expected_schema, + const std::string& expected_json, + const std::shared_ptr& predicate = nullptr) const { + ScanContextBuilder scan_context_builder(table_path); + scan_context_builder.AddOption(Options::SCAN_MODE, StartupMode::LatestFull().ToString()); + if (predicate) { + scan_context_builder.SetPredicate(predicate); + } + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_FALSE(result_plan->Splits().empty()); + + auto c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(*expected_schema, c_schema.get()).ok()); + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetReadSchema(std::move(c_schema)); + if (predicate) { + read_context_builder.SetPredicate(predicate); + } + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto actual, ReadResultCollector::CollectResult(batch_reader.get())); + + arrow::FieldVector expected_fields = expected_schema->fields(); + expected_fields.insert(expected_fields.begin(), arrow::field("_VALUE_KIND", arrow::int8())); + auto expected_type = arrow::struct_(expected_fields); + auto expected = std::make_shared( + arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_json).ValueOrDie()); + AssertChunkedArrayEquals(expected, actual); + } + protected: std::string file_format_; std::string test_dir_; @@ -127,11 +166,6 @@ TEST_P(NestedColumnPruningInteTest, PruneStructSubFields) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - // Scan to get splits - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - // Build projected schema: only read f0 (full) and f1.a (sub-field of struct) auto pruned_struct_type = arrow::struct_({ arrow::field("a", arrow::int32()), @@ -142,36 +176,12 @@ TEST_P(NestedColumnPruningInteTest, PruneStructSubFields) { }; auto projected_schema = arrow::schema(projected_fields); - // Export to C ArrowSchema - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - // Read with projected schema - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - // Expected: struct with _VALUE_KIND, f0, f1{a} - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::struct_({arrow::field("a", arrow::int32())})), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [10]], [0, 2, [20]], [0, 3, [30]], [0, 4, [40]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Projecting a STRUCT column as empty struct should return this column @@ -211,10 +221,6 @@ TEST_P(NestedColumnPruningInteTest, ProjectStructColumnAsEmptyStructReturnsNullC helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - // Project f1 as empty struct. arrow::FieldVector projected_fields = { arrow::field("f0", arrow::int32()), @@ -222,31 +228,11 @@ TEST_P(NestedColumnPruningInteTest, ProjectStructColumnAsEmptyStructReturnsNullC }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - auto expected_type = arrow::struct_({ - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::struct_({})), - }); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, null], [0, 2, null], [0, 3, null] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Two top-level struct columns have the same nested field name; projection should @@ -292,10 +278,6 @@ TEST_P(NestedColumnPruningInteTest, PruneSameNestedFieldNameFromDifferentStructC helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - // Project only s0.f1 and s1.f1; both nested field names are identical. auto projected_s0 = arrow::struct_({arrow::field("f1", arrow::int32())}); auto projected_s1 = arrow::struct_({arrow::field("f1", arrow::int32())}); @@ -306,33 +288,11 @@ TEST_P(NestedColumnPruningInteTest, PruneSameNestedFieldNameFromDifferentStructC }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("s0", arrow::struct_({arrow::field("f1", arrow::int32())})), - arrow::field("s1", arrow::struct_({arrow::field("f1", arrow::int32())})), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [11], [101]], [0, 2, [22], [202]], [0, 3, [33], [303]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Querying only non-existent struct sub-fields should fail fast. @@ -674,9 +634,6 @@ TEST_P(NestedColumnPruningInteTest, PruneEntireStructField) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - // Only read f0 and f2, skip f1 entirely. arrow::FieldVector projected_fields = { arrow::field("f0", arrow::int32()), @@ -684,32 +641,11 @@ TEST_P(NestedColumnPruningInteTest, PruneEntireStructField) { }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f2", arrow::float64()), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 100, 0.1], [0, 200, 0.2], [0, 300, 0.3] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Nested struct — prune sub-fields of a struct inside another struct. @@ -753,9 +689,6 @@ TEST_P(NestedColumnPruningInteTest, PruneDeepNestedStruct) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - // Projected: f0, f1{inner{x}} — skip f1.a and f1.inner.y auto pruned_inner = arrow::struct_({ arrow::field("x", arrow::int64()), @@ -769,36 +702,11 @@ TEST_P(NestedColumnPruningInteTest, PruneDeepNestedStruct) { }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::struct_({ - arrow::field("inner", arrow::struct_({ - arrow::field("x", arrow::int64()), - })), - })), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [[100]]], [0, 2, [[200]]], [0, 3, [[300]]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Nested projected schema with special fields under row tracking. @@ -930,11 +838,6 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeys) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - // Scan to get splits - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - // Build projected schema: read f0 and f1 with selected keys "a,c" auto selected_keys_metadata = arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"a,c"}); @@ -944,35 +847,12 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeys) { }; auto projected_schema = arrow::schema(projected_fields); - // Export to C ArrowSchema - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - // Read with projected schema - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - // Expected: only keys "a" and "c" remain in each map - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::map(arrow::utf8(), arrow::int32())), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [["a", 10], ["c", 30]]], [0, 2, [["a", 100], ["c", 300]]], [0, 3, [["c", 400]]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Selected-keys metadata on MAP nested inside STRUCT should be applied. @@ -1012,10 +892,6 @@ TEST_P(NestedColumnPruningInteTest, NestedMapSelectedKeysInStruct) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - auto selected_keys_metadata = arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"a,c"}); auto projected_struct_type = arrow::struct_({ @@ -1027,34 +903,11 @@ TEST_P(NestedColumnPruningInteTest, NestedMapSelectedKeysInStruct) { }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::struct_({ - arrow::field("m", arrow::map(arrow::utf8(), arrow::int32())), - })), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [[ ["a", 10], ["c", 30] ]]], [0, 2, [[ ["a", 100], ["c", 300] ]]], [0, 3, [[ ["c", 400] ]]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Partial STRUCT sub-field recall where one recalled child is MAP with selected keys. @@ -1095,10 +948,6 @@ TEST_P(NestedColumnPruningInteTest, PruneStructSubFieldsWithNestedMapSelectedKey helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - auto selected_keys_metadata = arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"a,c"}); auto projected_struct_type = arrow::struct_({ @@ -1111,35 +960,11 @@ TEST_P(NestedColumnPruningInteTest, PruneStructSubFieldsWithNestedMapSelectedKey }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::struct_({ - arrow::field("m", arrow::map(arrow::utf8(), arrow::int32())), - arrow::field("keep", arrow::int64()), - })), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [[ ["a", 10], ["c", 30] ], 1001]], [0, 2, [[ ["a", 100], ["c", 300] ], 1002]], [0, 3, [[ ["c", 400] ], 1003]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Null semantics should be preserved when pruning STRUCT sub-fields and @@ -1183,10 +1008,6 @@ TEST_P(NestedColumnPruningInteTest, PruneStructSubFieldsWithNestedMapSelectedKey helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - auto selected_keys_metadata = arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"a,c"}); auto projected_struct_type = arrow::struct_({ @@ -1199,37 +1020,13 @@ TEST_P(NestedColumnPruningInteTest, PruneStructSubFieldsWithNestedMapSelectedKey }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::struct_({ - arrow::field("m", arrow::map(arrow::utf8(), arrow::int32())), - arrow::field("keep", arrow::int64()), - })), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [[ ["a", 10], ["c", 30] ], 1001]], [0, 2, null], [0, 3, [null, 1003]], [0, 4, [[ ["c", 400] ], null]], [0, 5, [[ ["a", 500], ["c", null] ], 1005]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: MAP_SELECTED_KEYS metadata value is empty string, select empty-string map key. @@ -1267,11 +1064,6 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeysEmptyStringKey) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - // Scan to get splits - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - // Build projected schema: read f0 and f1 with selected keys metadata set to empty string. auto selected_keys_metadata = arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {""}); @@ -1281,34 +1073,12 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeysEmptyStringKey) { }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - // Read with projected schema - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - // Expected: only empty-string key remains. - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::map(arrow::utf8(), arrow::int32())), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [["", 9]]], [0, 2, [["", 99]]], [0, 3, []] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: MAP_SELECTED_KEYS output map entry order should follow selected key order. @@ -1345,10 +1115,6 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeysPreserveOrder) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - ASSERT_FALSE(data_splits.empty()); - // Query key order is c,a and output should follow this order. auto selected_keys_metadata = arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"c,a"}); @@ -1358,32 +1124,127 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeysPreserveOrder) { }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field("f1", arrow::map(arrow::utf8(), arrow::int32())), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [["c", 30], ["a", 10]]], [0, 2, [["c", 300], ["a", 100]]], [0, 3, [["c", 400], ["a", 500]]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); + ])"); +} + +TEST_P(NestedColumnPruningInteTest, NestedStructMapSelectedKeysWithPredicate) { + if (file_format_ != "parquet" && file_format_ != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int32()); + auto info_type = arrow::struct_({ + arrow::field("score", arrow::int64()), + arrow::field("label", arrow::utf8()), + arrow::field("drop", arrow::utf8()), + }); + auto payload_type = arrow::struct_({ + arrow::field("attrs", map_type), + arrow::field("info", info_type), + arrow::field("note", arrow::utf8()), + }); + arrow::FieldVector table_fields = { + arrow::field("id", arrow::int32()), + arrow::field("payload", payload_type), + arrow::field("category", arrow::utf8()), + }; + auto table_schema = arrow::schema(table_fields); - AssertChunkedArrayEquals(expected_chunked, read_result); + std::map options = { + {Options::MANIFEST_FORMAT, "AVRO"}, + {Options::FILE_FORMAT, StringUtils::ToUpperCase(file_format_)}, + {Options::TARGET_FILE_SIZE, "1048576"}, + {Options::BUCKET, "-1"}, + {Options::WRITE_BATCH_SIZE, "1"}, + {"parquet.page.size", "1"}, + {"parquet.enable-dictionary", "false"}, + {"parquet.write.enable-page-index", "true"}, + {"parquet.write.max-row-group-length", "1"}, + {"parquet.read.enable-page-index-filter", "true"}, + {"orc.stripe.size", "1"}, + {"orc.row.index.stride", "1"}, + }; + + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(test_dir_, table_schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); + (void)helper; + + WriteContextBuilder write_context_builder(table_path_, "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, + write_context_builder.SetOptions(options).Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); + + auto write_one_row = [&](const std::string& data) -> Status { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(table_fields), data, + /*partition_map=*/{}, /*bucket=*/0, {})); + return file_store_write->Write(std::move(batch)); + }; + + ASSERT_OK(write_one_row( + R"([[1, [[["a", 10], ["b", 20], ["c", 30]], [1001, "low", "x"], "n1"], "hot"]])")); + ASSERT_OK(write_one_row( + R"([[12, [[["a", 100], ["c", 300], ["d", 400]], [1002, "mid", "y"], "n2"], "warm"]])")); + ASSERT_OK(write_one_row( + R"([[21, [[["b", 200], ["c", 500], ["a", 600]], [1003, "high", "z"], "n3"], "cold"]])")); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + file_store_write->PrepareCommit(/*wait_compaction=*/false, + /*commit_identifier=*/0)); + ASSERT_OK(file_store_write->Close()); + + CommitContextBuilder commit_context_builder(table_path_, "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr commit_context, + commit_context_builder.SetOptions(options).Finish()); + ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); + ASSERT_OK(commit->Commit(commit_msgs, /*commit_identifier=*/0)); + + // Read selected MAP keys together with nested STRUCT sub-fields. + auto selected_keys_metadata = + arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"c,a"}); + auto selected_payload_type = arrow::struct_({ + arrow::field("attrs", map_type)->WithMetadata(selected_keys_metadata), + arrow::field("info", arrow::struct_({arrow::field("score", arrow::int64())})), + }); + auto selected_schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("payload", selected_payload_type), + }); + + ScanReadAndCheck(table_path_, selected_schema, R"([ + [0, 1, [[["c", 30], ["a", 10]], [1001]]], + [0, 12, [[["c", 300], ["a", 100]], [1002]]], + [0, 21, [[["c", 500], ["a", 600]], [1003]]] + ])"); + + // Read only part of top-level columns and part of nested STRUCT fields. + auto partial_payload_type = arrow::struct_({ + arrow::field("info", arrow::struct_({arrow::field("label", arrow::utf8())})), + }); + auto partial_schema = arrow::schema({ + arrow::field("payload", partial_payload_type), + arrow::field("category", arrow::utf8()), + }); + + ScanReadAndCheck(table_path_, partial_schema, R"([ + [0, [["low"]], "hot"], + [0, [["mid"]], "warm"], + [0, [["high"]], "cold"] + ])"); + + // Read selected nested fields with predicate pushdown. + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"id", + FieldType::INT, Literal(10)); + + ScanReadAndCheck(table_path_, selected_schema, R"([ + [0, 12, [[["c", 300], ["a", 100]], [1002]]], + [0, 21, [[["c", 500], ["a", 600]], [1003]]] + ])", + predicate); } // Test: ORC dictionary-encoded map key/value should work with MAP_SELECTED_KEYS. @@ -1431,8 +1292,8 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeysWithOrcDictionaryEncodedMap) helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto data_splits, helper->NewScan(StartupMode::LatestFull(), + /*snapshot_id=*/std::nullopt)); ASSERT_FALSE(data_splits.empty()); auto selected_keys_metadata = @@ -1478,7 +1339,8 @@ TEST_P(NestedColumnPruningInteTest, MapSelectedKeysWithOrcDictionaryEncodedMap) AssertChunkedArrayEquals(expected_chunked, actual_chunked); } -// Test: Deeper nested struct — prune sub-fields of a struct inside a struct inside another struct. +// Test: Deeper nested struct — prune sub-fields of a struct inside a struct inside another +// struct. TEST_P(NestedColumnPruningInteTest, PruneDeeperNestedStruct) { // Table schema: f0 (int32), f1 (struct{a: int32, inner1: struct{x: int64, inner2: struct{p: // utf8, q: float64}}}) @@ -1524,9 +1386,6 @@ TEST_P(NestedColumnPruningInteTest, PruneDeeperNestedStruct) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); - // Projected: f0, f1{inner1{inner2{p}}} auto pruned_inner2 = arrow::struct_({ arrow::field("p", arrow::utf8()), @@ -1543,40 +1402,11 @@ TEST_P(NestedColumnPruningInteTest, PruneDeeperNestedStruct) { }; auto projected_schema = arrow::schema(projected_fields); - auto c_schema = std::make_unique(); - ASSERT_TRUE(arrow::ExportSchema(*projected_schema, c_schema.get()).ok()); - - ReadContextBuilder read_context_builder(table_path_); - read_context_builder.SetOptions(options).SetReadSchema(std::move(c_schema)); - ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(data_splits)); - ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); - - arrow::FieldVector expected_fields = { - arrow::field("_VALUE_KIND", arrow::int8()), - arrow::field("f0", arrow::int32()), - arrow::field( - "f1", arrow::struct_({ - arrow::field("inner1", - arrow::struct_({ - arrow::field("inner2", arrow::struct_({ - arrow::field("p", arrow::utf8()), - })), - })), - })), - }; - auto expected_type = arrow::struct_(expected_fields); - std::string expected_data = R"([ + ScanReadAndCheck(table_path_, projected_schema, R"([ [0, 1, [[[ "ppp" ]]]], [0, 2, [[[ "qqq" ]]]], [0, 3, [[[ "rrr" ]]]] - ])"; - auto expected_array = - arrow::ipc::internal::json::ArrayFromJSON(expected_type, expected_data).ValueOrDie(); - auto expected_chunked = std::make_shared(expected_array); - - AssertChunkedArrayEquals(expected_chunked, read_result); + ])"); } // Test: Nested pruning for LIST> in integration path. @@ -1617,8 +1447,8 @@ TEST_P(NestedColumnPruningInteTest, PruneListStructSubFields) { helper->WriteAndCommit(std::move(batch), commit_identifier++, /*expected_commit_messages=*/std::nullopt)); - ASSERT_OK_AND_ASSIGN(auto data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto data_splits, helper->NewScan(StartupMode::LatestFull(), + /*snapshot_id=*/std::nullopt)); ASSERT_FALSE(data_splits.empty()); auto pruned_list_elem_struct = arrow::struct_({arrow::field("x", arrow::int64())}); diff --git a/test/inte/pk_compaction_inte_test.cpp b/test/inte/pk_compaction_inte_test.cpp index d584e17b9..fad7e61dd 100644 --- a/test/inte/pk_compaction_inte_test.cpp +++ b/test/inte/pk_compaction_inte_test.cpp @@ -42,11 +42,15 @@ #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" #include "paimon/memory/memory_pool.h" +#include "paimon/predicate/literal.h" +#include "paimon/predicate/predicate_builder.h" #include "paimon/read_context.h" #include "paimon/record_batch.h" #include "paimon/result.h" +#include "paimon/scan_context.h" #include "paimon/status.h" #include "paimon/table/source/table_read.h" +#include "paimon/table/source/table_scan.h" #include "paimon/testing/utils/binary_row_generator.h" #include "paimon/testing/utils/data_generator.h" #include "paimon/testing/utils/io_exception_helper.h" @@ -224,13 +228,23 @@ class PkCompactionInteTest : public ::testing::Test, void ScanAndVerify(const std::string& table_path, const arrow::FieldVector& fields, const std::map, std::string>& - expected_data_per_partition_bucket) { + expected_data_per_partition_bucket, + const std::shared_ptr& predicate = nullptr) { std::map options = {{Options::FILE_SYSTEM, "local"}}; ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(table_path, options, /*is_streaming_mode=*/false)); - ASSERT_OK_AND_ASSIGN( - std::vector> data_splits, - helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ScanContextBuilder scan_context_builder(table_path); + scan_context_builder.WithStreamingMode(false).SetOptions(options).AddOption( + Options::SCAN_MODE, StartupMode::LatestFull().ToString()); + if (predicate) { + scan_context_builder.SetPredicate(predicate); + } + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan_context, + scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr table_scan, + TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr result_plan, table_scan->CreatePlan()); + std::vector> data_splits = result_plan->Splits(); arrow::FieldVector fields_with_row_kind = fields; fields_with_row_kind.insert(fields_with_row_kind.begin(), @@ -253,8 +267,31 @@ class PkCompactionInteTest : public ::testing::Test, auto iter = expected_data_per_partition_bucket.find(key); ASSERT_TRUE(iter != expected_data_per_partition_bucket.end()) << "Unexpected partition=" << key.first << " bucket=" << key.second; - ASSERT_OK_AND_ASSIGN(bool success, - helper->ReadAndCheckResult(data_type, splits, iter->second)); + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetOptions(options); + if (predicate) { + read_context_builder.SetPredicate(predicate); + } + ASSERT_OK_AND_ASSIGN(std::unique_ptr read_context, + read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr table_read, + TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch_reader, + table_read->CreateReader(splits)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr read_result, + ReadResultCollector::CollectResult(batch_reader.get())); + auto expected_array = + arrow::ipc::internal::json::ArrayFromJSON(data_type, iter->second).ValueOrDie(); + auto expected_chunk_array = std::make_shared(expected_array); + + bool success = expected_chunk_array->Equals(read_result); + if (!success) { + std::cout << "[expected_data_type]" << expected_chunk_array->type()->ToString() + << std::endl; + std::cout << "[actual_data_type]" << read_result->type()->ToString() << std::endl; + std::cout << "[expected]:" << expected_chunk_array->ToString() << std::endl; + std::cout << "[actual]: " << read_result->ToString() << std::endl; + } ASSERT_TRUE(success); } } @@ -318,6 +355,190 @@ class PkCompactionInteTest : public ::testing::Test, arrow::FieldVector fields_; }; +// Verify shared-shredding MAP can be read correctly after PK full compaction. +TEST_P(PkCompactionInteTest, TestKeyValueTableFullCompactionWithMapSharedShredding) { + auto file_format = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("tags", map_type), + }; + std::vector primary_keys = {"id"}; + std::vector partition_keys = {}; + std::map options = { + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "1"}, + {Options::BUCKET_KEY, "id"}, + {Options::FILE_SYSTEM, "local"}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "2"}, + }; + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(dir_->Str(), arrow::schema(fields), partition_keys, + primary_keys, options, /*is_streaming_mode=*/true)); + + int64_t commit_identifier = 0; + ASSERT_OK_AND_ASSIGN(auto batch_0, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [1, [["a", 10], ["b", 20]]], + [2, [["c", 30]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_0), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK_AND_ASSIGN(auto batch_1, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [1, [["a", 100], ["d", 400]]], + [3, [["e", 50]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_1), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK(helper->write_->Compact(/*partition=*/{}, /*bucket=*/0, + /*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN( + std::vector> commit_messages, + helper->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier)); + ASSERT_FALSE(commit_messages.empty()); + ASSERT_OK(helper->commit_->Commit(commit_messages, commit_identifier)); + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(Snapshot::CommitKind::Compact(), snapshot.value().GetCommitKind()); + + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits, + R"([ + [0, 1, [["a", 100], ["d", 400]]], + [0, 2, [["c", 30]]], + [0, 3, [["e", 50]]] + ])")); + ASSERT_TRUE(success); +} + +TEST_P(PkCompactionInteTest, TestKeyValueTableDvCompactionWithMapSharedShredding) { + auto file_format = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("tags", map_type), + arrow::field("padding", arrow::utf8()), + }; + std::vector primary_keys = {"id"}; + std::vector partition_keys = {}; + std::map options = { + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "1"}, + {Options::BUCKET_KEY, "id"}, + {Options::FILE_SYSTEM, "local"}, + {Options::FILE_COMPRESSION, "none"}, + {Options::DELETION_VECTORS_ENABLED, "true"}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "2"}, + {"parquet.page.size", "1"}, + {"parquet.enable-dictionary", "false"}, + {"parquet.write.enable-page-index", "true"}, + {"parquet.write.max-row-group-length", "1"}, + {"parquet.read.enable-page-index-filter", "true"}, + {"orc.stripe.size", "1"}, + {"orc.row.index.stride", "1"}, + }; + CreateTable(fields, partition_keys, primary_keys, options); + std::string table_path = TablePath(); + auto data_type = arrow::struct_(fields); + int64_t commit_id = 0; + std::string padding(2048, 'X'); + + { + // clang-format off + std::string json_data = R"([ +[1, [["a", 10], ["b", 20]], ")" + padding + R"("], +[2, [["c", 30]], ")" + padding + R"("], +[3, [["d", 40]], ")" + padding + R"("], +[4, null, ")" + padding + R"("], +[6, [["j", 60], ["k", 70]], ")" + padding + R"("], +[7, [["l", 80], ["m", 90], ["n", 100]], ")" + padding + R"("], +[8, [["o", 110]], ")" + padding + R"("] +])"; + // clang-format on + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, json_data).ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, commit_id++)); + } + + ASSERT_OK_AND_ASSIGN( + auto upgrade_msgs, + CompactAndCommit(table_path, {}, 0, /*full_compaction=*/true, commit_id++)); + ASSERT_FALSE(HasDeletionVectorIndexFiles(upgrade_msgs)) + << "Initial full compact should not produce DV index files"; + + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + [1, [["a", 100], ["e", 500]], "u1"], + [5, [["h", 80]], "u5"] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, commit_id++)); + } + + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + [2, [["c", 300], ["f", 600], ["g", 700]], "u2"], + [5, [["h", 800], ["i", 900]], "u5-new"] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, commit_id++)); + } + + ASSERT_OK_AND_ASSIGN( + auto dv_compact_msgs, + CompactAndCommit(table_path, {}, 0, /*full_compaction=*/false, commit_id++)); + ASSERT_TRUE(HasDeletionVectorIndexFiles(dv_compact_msgs)) + << "Non-full compact should produce DV index files"; + + std::map, std::string> expected_data; + // clang-format off + expected_data[std::make_pair("", 0)] = R"([ +[0, 3, [["d", 40]], ")" + padding + R"("], +[0, 4, null, ")" + padding + R"("], +[0, 6, [["j", 60], ["k", 70]], ")" + padding + R"("], +[0, 7, [["l", 80], ["m", 90], ["n", 100]], ")" + padding + R"("], +[0, 8, [["o", 110]], ")" + padding + R"("], +[0, 1, [["a", 100], ["e", 500]], "u1"], +[0, 2, [["c", 300], ["f", 600], ["g", 700]], "u2"], +[0, 5, [["h", 800], ["i", 900]], "u5-new"] +])"; + // clang-format on + ScanAndVerify(table_path, fields, expected_data); + + // read with predicate and dv bitmap + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"id", + FieldType::INT, Literal(6)); + std::map, std::string> expected_predicate_data; + // clang-format off + expected_predicate_data[std::make_pair("", 0)] = R"([ +[0, 7, [["l", 80], ["m", 90], ["n", 100]], ")" + padding + R"("], +[0, 8, [["o", 110]], ")" + padding + R"("] +])"; + // clang-format on + ScanAndVerify(table_path, fields, expected_predicate_data, predicate); +} + // Test: deduplicate merge engine with deletion vectors enabled. // Verifies that a non-full compact produces DV index files when level-0 files // overlap with high-level files, and that data is correct after DV compact and full compact. @@ -1995,13 +2216,21 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithDvInAllLevels) { ASSERT_OK_AND_ASSIGN(auto compact_msgs, CompactAndCommit(table_path, {{"f1", "10"}}, 0, /*full_compaction=*/false, commit_id++)); - ASSERT_TRUE(HasDeletionVectorIndexFiles(compact_msgs)) - << "Non-full compact #1 must produce DV for Alice/Bob in L5"; + ASSERT_TRUE(HasDeletionVectorIndexFiles(compact_msgs)); + + std::map, std::string> expected_data; + expected_data[std::make_pair("f1=10/", 0)] = R"([ + [0, "Carol", 10, 0, 3.0, ")" + padding + R"("], + [0, "Dave", 10, 0, 4.0, ")" + padding + R"("], + [0, "Eve", 10, 0, 5.0, ")" + padding + R"("], + [0, "Alice", 10, 0, 10.0, "v2a"], + [0, "Bob", 10, 0, 20.0, "v2b"] + ])"; + ScanAndVerify(table_path, fields, expected_data); } // Step 3: Write batch_3 (overlap Bob/Carol) → non-full compact. - // L0 merges to a lower intermediate level; DV marks Bob in the intermediate file - // from Step 2, and Carol in L5. + // DV marks Carol in L5. { auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ ["Bob", 10, 0, 200.0, "v3b"], @@ -2013,8 +2242,16 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithDvInAllLevels) { ASSERT_OK_AND_ASSIGN(auto compact_msgs, CompactAndCommit(table_path, {{"f1", "10"}}, 0, /*full_compaction=*/false, commit_id++)); - ASSERT_TRUE(HasDeletionVectorIndexFiles(compact_msgs)) - << "Non-full compact #2 must produce DV for Bob (intermediate) and Carol (L5)"; + ASSERT_TRUE(HasDeletionVectorIndexFiles(compact_msgs)); + std::map, std::string> expected_data; + expected_data[std::make_pair("f1=10/", 0)] = R"([ + [0, "Dave", 10, 0, 4.0, ")" + padding + R"("], + [0, "Eve", 10, 0, 5.0, ")" + padding + R"("], + [0, "Alice", 10, 0, 10.0, "v2a"], + [0, "Bob", 10, 0, 200.0, "v3b"], + [0, "Carol", 10, 0, 300.0, "v3c"] + ])"; + ScanAndVerify(table_path, fields, expected_data); } // Step 4: Write batch_4 (overlap Carol/Dave) → non-full compact. @@ -2030,8 +2267,16 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithDvInAllLevels) { ASSERT_OK_AND_ASSIGN(auto compact_msgs, CompactAndCommit(table_path, {{"f1", "10"}}, 0, /*full_compaction=*/false, commit_id++)); - ASSERT_TRUE(HasDeletionVectorIndexFiles(compact_msgs)) - << "Non-full compact #3 must produce DV for Carol (intermediate) and Dave (L5)"; + ASSERT_TRUE(HasDeletionVectorIndexFiles(compact_msgs)); + std::map, std::string> expected_data; + expected_data[std::make_pair("f1=10/", 0)] = R"([ + [0, "Eve", 10, 0, 5.0, ")" + padding + R"("], + [0, "Alice", 10, 0, 10.0, "v2a"], + [0, "Bob", 10, 0, 200.0, "v3b"], + [0, "Carol", 10, 0, 3000.0, "v4c"], + [0, "Dave", 10, 0, 4000.0, "v4d"] + ])"; + ScanAndVerify(table_path, fields, expected_data); } // Step 5: Write batch_5 (overlap Dave/Eve) → leave at L0 (no compact). @@ -2045,11 +2290,11 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithDvInAllLevels) { ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, commit_id++)); std::map, std::string> expected_data; expected_data[std::make_pair("f1=10/", 0)] = R"([ + [0, "Eve", 10, 0, 5.0, ")" + padding + R"("], [0, "Alice", 10, 0, 10.0, "v2a"], [0, "Bob", 10, 0, 200.0, "v3b"], [0, "Carol", 10, 0, 3000.0, "v4c"], - [0, "Dave", 10, 0, 40000.0, "v5d"], - [0, "Eve", 10, 0, 50000.0, "v5e"] + [0, "Dave", 10, 0, 4000.0, "v4d"] ])"; ScanAndVerify(table_path, fields, expected_data); } @@ -2058,9 +2303,7 @@ TEST_F(PkCompactionInteTest, TestDeduplicateWithDvInAllLevels) { ASSERT_OK_AND_ASSIGN( auto final_compact_msgs, CompactAndCommit(table_path, {{"f1", "10"}}, 0, /*full_compaction=*/true, commit_id++)); - } - // Step 7: ScanAndVerify after full compact (globally sorted, all data in L5). - { + // Step 7: ScanAndVerify after full compact (globally sorted, all data in L5). std::map, std::string> expected_data; expected_data[std::make_pair("f1=10/", 0)] = R"([ [0, "Alice", 10, 0, 10.0, "v2a"], diff --git a/test/inte/write_and_read_inte_test.cpp b/test/inte/write_and_read_inte_test.cpp index fda84da90..da3dcc6b7 100644 --- a/test/inte/write_and_read_inte_test.cpp +++ b/test/inte/write_and_read_inte_test.cpp @@ -27,18 +27,27 @@ #include "arrow/c/bridge.h" #include "arrow/ipc/json_simple.h" #include "arrow/type.h" +#include "fmt/format.h" #include "gtest/gtest.h" +#include "paimon/commit_context.h" +#include "paimon/common/data/shredding/map_shared_shredding_utils.h" #include "paimon/common/reader/reader_utils.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/string_utils.h" +#include "paimon/core/io/data_file_meta.h" #include "paimon/core/schema/schema_manager.h" +#include "paimon/core/table/source/data_split_impl.h" #include "paimon/defs.h" +#include "paimon/file_store_commit.h" +#include "paimon/file_store_write.h" +#include "paimon/format/file_format_factory.h" #include "paimon/fs/file_system.h" #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/read_context.h" #include "paimon/reader/batch_reader.h" +#include "paimon/record_batch.h" #include "paimon/result.h" #include "paimon/scan_context.h" #include "paimon/status.h" @@ -49,6 +58,7 @@ #include "paimon/testing/utils/read_result_collector.h" #include "paimon/testing/utils/test_helper.h" #include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" namespace paimon::test { // This is a sdk end-to-end test demo that supports write, commit, scan, and read operations. @@ -143,6 +153,65 @@ class WriteAndReadInteTest return table_scan->CreatePlan(); } + Result>>> CurrentDataFiles( + const std::map& options) const { + PAIMON_ASSIGN_OR_RAISE(auto plan, InnerScan(options)); + std::vector>> files; + for (const auto& split : plan->Splits()) { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("split cannot be cast to DataSplitImpl"); + } + for (const auto& data_file : data_split->DataFiles()) { + files.emplace_back(data_split->BucketPath(), data_file); + } + } + std::sort(files.begin(), files.end(), [](const auto& left, const auto& right) { + return left.second->min_sequence_number < right.second->min_sequence_number; + }); + return files; + } + + Result> ReadDataFileSchema( + const std::string& bucket_path, const std::shared_ptr& file, + const std::map& options) const { + std::string file_path = PathUtil::JoinPath(bucket_path, file->file_name); + PAIMON_ASSIGN_OR_RAISE(auto unique_input_stream, dir_->GetFileSystem()->Open(file_path)); + std::shared_ptr input_stream(std::move(unique_input_stream)); + PAIMON_ASSIGN_OR_RAISE(std::string format_str, file->FileFormat()); + PAIMON_ASSIGN_OR_RAISE(auto file_format, FileFormatFactory::Get(format_str, options)); + PAIMON_ASSIGN_OR_RAISE(auto reader_builder, file_format->CreateReaderBuilder(10)); + PAIMON_ASSIGN_OR_RAISE(auto reader, reader_builder->Build(input_stream)); + PAIMON_ASSIGN_OR_RAISE(auto c_file_schema, reader->GetFileSchema()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto file_schema, + arrow::ImportSchema(c_file_schema.get())); + return file_schema; + } + + Result ReadShreddingMeta( + const std::pair>& file, + const std::string& field_name, const std::map& options) const { + PAIMON_ASSIGN_OR_RAISE(auto file_schema, + ReadDataFileSchema(file.first, file.second, options)); + std::shared_ptr field = file_schema->GetFieldByName(field_name); + if (!field) { + return Status::Invalid( + fmt::format("field {} not found in data file schema", field_name)); + } + std::shared_ptr metadata = field->metadata(); + if (!metadata) { + return Status::Invalid( + fmt::format("field {} has no shared-shredding metadata", field_name)); + } + std::shared_ptr metadata_copy = metadata->Copy(); + if (!MapSharedShreddingUtils::HasShreddingMetadata(metadata_copy)) { + return Status::Invalid( + fmt::format("field {} has no shared-shredding metadata", field_name)); + } + return MapSharedShreddingUtils::DeserializeMetadata( + metadata_copy, MapSharedShreddingDefine::kDefaultDictCompression); + } + private: std::string test_dir_; std::unique_ptr dir_; @@ -1314,6 +1383,429 @@ TEST_P(WriteAndReadInteTest, TestAppendSharedShreddingMap) { ASSERT_TRUE(success); } +TEST_P(WriteAndReadInteTest, TestAppendMapSharedShreddingWithPartitionAndBucket) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("dt", arrow::utf8()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), + }; + auto schema = arrow::schema(fields); + std::map options = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "id"}, + {Options::FILE_SYSTEM, file_system}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "5"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(test_dir_, schema, /*partition_keys=*/{"dt"}, + /*primary_keys=*/{}, options, + /*is_streaming_mode=*/false)); + + ASSERT_OK_AND_ASSIGN( + auto p1_bucket0_first, + TestHelper::MakeRecordBatch(arrow::struct_(fields), R"([[1, "p1", [["a", 1]]]])", + /*partition_map=*/{{"dt", "p1"}}, + /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(p1_bucket0_first), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + helper.reset(); + ASSERT_OK_AND_ASSIGN(helper, + TestHelper::Create(PathUtil::JoinPath(test_dir_, "foo.db/bar"), options, + /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN( + auto p1_bucket0_second, + TestHelper::MakeRecordBatch(arrow::struct_(fields), R"([[2, "p1", [["b", 2]]]])", + /*partition_map=*/{{"dt", "p1"}}, + /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(p1_bucket0_second), /*commit_identifier=*/1, + /*expected_commit_messages=*/std::nullopt)); + + helper.reset(); + ASSERT_OK_AND_ASSIGN(helper, + TestHelper::Create(PathUtil::JoinPath(test_dir_, "foo.db/bar"), options, + /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN( + auto p2_bucket1_first, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([[3, "p2", [["x", 10], ["y", 20], ["z", 30], ["w", 40]]]])", + /*partition_map=*/{{"dt", "p2"}}, /*bucket=*/1, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(p2_bucket1_first), /*commit_identifier=*/2, + /*expected_commit_messages=*/std::nullopt)); + + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_GE(data_splits.size(), 2); + std::string expected_data = R"([ + [0, 1, "p1", [["a", 1]]], + [0, 2, "p1", [["b", 2]]], + [0, 3, "p2", [["w", 40], ["x", 10], ["y", 20], ["z", 30]]] + ])"; + ASSERT_OK_AND_ASSIGN(bool success, + helper->ReadAndCheckResult(data_type, data_splits, expected_data)); + ASSERT_TRUE(success); + + ASSERT_OK_AND_ASSIGN(auto files, CurrentDataFiles(options)); + ASSERT_EQ(3, files.size()); + std::vector>> p1_bucket0_files; + std::vector>> p2_bucket1_files; + for (const auto& file : files) { + if (file.first.find("dt=p1/bucket-0") != std::string::npos) { + p1_bucket0_files.push_back(file); + } else if (file.first.find("dt=p2/bucket-1") != std::string::npos) { + p2_bucket1_files.push_back(file); + } + } + ASSERT_EQ(2, p1_bucket0_files.size()); + ASSERT_EQ(1, p2_bucket1_files.size()); + + ASSERT_OK_AND_ASSIGN(auto p1_first_meta, + ReadShreddingMeta(p1_bucket0_files[0], "tags", options)); + ASSERT_EQ(5, p1_first_meta.num_columns); + ASSERT_EQ(1, p1_first_meta.max_row_width); + + ASSERT_OK_AND_ASSIGN(auto p1_second_meta, + ReadShreddingMeta(p1_bucket0_files[1], "tags", options)); + ASSERT_EQ(1, p1_second_meta.num_columns); + ASSERT_EQ(1, p1_second_meta.max_row_width); + + ASSERT_OK_AND_ASSIGN(auto p2_first_meta, + ReadShreddingMeta(p2_bucket1_files[0], "tags", options)); + ASSERT_EQ(5, p2_first_meta.num_columns); + ASSERT_EQ(4, p2_first_meta.max_row_width); +} + +TEST_P(WriteAndReadInteTest, TestAppendMapSharedShreddingWithPredicate) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), arrow::int64())), + }; + auto schema = arrow::schema(fields); + std::map options = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1048576"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, file_system}, + {Options::WRITE_BATCH_SIZE, "1"}, + {"parquet.page.size", "1"}, + {"parquet.enable-dictionary", "false"}, + {"parquet.write.enable-page-index", "true"}, + {"parquet.write.max-row-group-length", "1"}, + {"parquet.read.enable-page-index-filter", "true"}, + {"orc.stripe.size", "1"}, + {"orc.row.index.stride", "1"}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "1"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(test_dir_, schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); + (void)helper; + + std::string table_path = PathUtil::JoinPath(test_dir_, "foo.db/bar"); + WriteContextBuilder write_context_builder(table_path, "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, + write_context_builder.SetOptions(options).Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); + + auto write_one_row = [&](const std::string& data) -> Status { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data, + /*partition_map=*/{}, /*bucket=*/0, {})); + return file_store_write->Write(std::move(batch)); + }; + + ASSERT_OK(write_one_row(R"([[1, [["a", 10], ["b", 20]]]])")); + ASSERT_OK(write_one_row(R"([[12, [["c", 31], ["d", 41]]]])")); + ASSERT_OK(write_one_row(R"([[21, [["e", 50], ["f", 60]]]])")); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + file_store_write->PrepareCommit(/*wait_compaction=*/false, + /*commit_identifier=*/0)); + ASSERT_OK(file_store_write->Close()); + + CommitContextBuilder commit_context_builder(table_path, "commit_user_1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr commit_context, + commit_context_builder.SetOptions(options).Finish()); + ASSERT_OK_AND_ASSIGN(auto commit, FileStoreCommit::Create(std::move(commit_context))); + ASSERT_OK(commit->Commit(commit_msgs, /*commit_identifier=*/0)); + + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"id", + FieldType::INT, Literal(10)); + ScanContextBuilder scan_context_builder(table_path); + scan_context_builder.SetOptions(options) + .AddOption(Options::SCAN_MODE, StartupMode::LatestFull().ToString()) + .SetPredicate(predicate); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_FALSE(result_plan->Splits().empty()); + + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetOptions(options).SetPredicate(predicate); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto actual, ReadResultCollector::CollectResult(batch_reader.get())); + + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto expected_type = arrow::struct_(fields_with_row_kind); + auto expected = std::make_shared( + arrow::ipc::internal::json::ArrayFromJSON(expected_type, R"([ + [0, 12, [["c", 31], ["d", 41]]], + [0, 21, [["e", 50], ["f", 60]]] + ])") + .ValueOrDie()); + ASSERT_TRUE(expected->Equals(actual)) << actual->ToString(); +} + +TEST_P(WriteAndReadInteTest, TestMapSharedShreddingRestoreAdaptiveColumnCountFromFileMetadata) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("metrics", map_type), + }; + std::map options = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::BUCKET, "1"}, + {Options::BUCKET_KEY, "id"}, + {Options::FILE_SYSTEM, file_system}, + {Options::WRITE_ONLY, "true"}, + {"fields.metrics.map.storage-layout", "shared-shredding"}, + {"fields.metrics.map.shared-shredding.max-columns", "8"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(test_dir_, arrow::schema(fields), /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/false)); + + ASSERT_OK_AND_ASSIGN( + auto batch_v0, TestHelper::MakeRecordBatch(arrow::struct_(fields), R"([[1, [["a", 11]]]])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_v0), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + helper.reset(); + ASSERT_OK_AND_ASSIGN(helper, + TestHelper::Create(PathUtil::JoinPath(test_dir_, "foo.db/bar"), options, + /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN( + auto batch_v1, TestHelper::MakeRecordBatch(arrow::struct_(fields), R"([[2, [["b", 22]]]])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_v1), /*commit_identifier=*/1, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK_AND_ASSIGN(auto files, CurrentDataFiles(options)); + ASSERT_EQ(2, files.size()); + ASSERT_OK_AND_ASSIGN(auto first_meta, ReadShreddingMeta(files[0], "metrics", options)); + ASSERT_EQ(8, first_meta.num_columns); + ASSERT_EQ(1, first_meta.max_row_width); + ASSERT_OK_AND_ASSIGN(auto second_meta, ReadShreddingMeta(files[1], "metrics", options)); + ASSERT_EQ(1, second_meta.num_columns); + ASSERT_EQ(1, second_meta.max_row_width); + + auto expected_type = arrow::struct_({ + arrow::field("_VALUE_KIND", arrow::int8()), + arrow::field("id", arrow::int32()), + arrow::field("metrics", map_type), + }); + ASSERT_OK_AND_ASSIGN(auto splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(expected_type, splits, + R"([ + [0, 1, [["a", 11]]], + [0, 2, [["b", 22]]] + ])")); + ASSERT_TRUE(success); +} + +TEST_P(WriteAndReadInteTest, TestMapSharedShreddingSwitchMapLayoutAndUseMaxColumnsWithoutMetadata) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("metrics", map_type), + arrow::field("labels", map_type), + }; + std::map options_v0 = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::BUCKET, "1"}, + {Options::BUCKET_KEY, "id"}, + {Options::FILE_SYSTEM, file_system}, + {Options::WRITE_ONLY, "true"}, + {"fields.labels.map.storage-layout", "shared-shredding"}, + {"fields.labels.map.shared-shredding.max-columns", "4"}, + }; + if (file_system == "jindo") { + options_v0 = AddOptionsForJindo(options_v0); + } + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(test_dir_, arrow::schema(fields), /*partition_keys=*/{}, + /*primary_keys=*/{}, options_v0, + /*is_streaming_mode=*/false)); + + ASSERT_OK_AND_ASSIGN(auto batch_v0, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [1, [["a", 11], ["b", 12]], [["x", 21]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_v0), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + std::map options_v1 = options_v0; + options_v1["fields.metrics.map.storage-layout"] = "shared-shredding"; + options_v1["fields.metrics.map.shared-shredding.max-columns"] = "3"; + options_v1["fields.labels.map.storage-layout"] = "default"; + options_v1.erase("fields.labels.map.shared-shredding.max-columns"); + ASSERT_OK( + WriteNextSchema({DataField(0, fields[0]), DataField(1, fields[1]), DataField(2, fields[2])}, + /*highest_field_id=*/2, options_v1)); + + helper.reset(); + ASSERT_OK_AND_ASSIGN(helper, TestHelper::Create(PathUtil::JoinPath(test_dir_, "foo.db/bar"), + options_v1, /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN(auto batch_v1, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [2, [["c", 31]], [["y", 41], ["z", 42]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_v1), /*commit_identifier=*/1, + /*expected_commit_messages=*/std::nullopt)); + + ASSERT_OK_AND_ASSIGN(auto files, CurrentDataFiles(options_v1)); + ASSERT_EQ(2, files.size()); + ASSERT_OK_AND_ASSIGN(auto metrics_meta, ReadShreddingMeta(files[1], "metrics", options_v1)); + ASSERT_EQ(3, metrics_meta.num_columns); + ASSERT_EQ(1, metrics_meta.max_row_width); + + auto expected_type = arrow::struct_({ + arrow::field("_VALUE_KIND", arrow::int8()), + arrow::field("id", arrow::int32()), + arrow::field("metrics", map_type), + arrow::field("labels", map_type), + }); + ASSERT_OK_AND_ASSIGN(auto splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(expected_type, splits, + R"([ + [0, 1, [["a", 11], ["b", 12]], [["x", 21]]], + [0, 2, [["c", 31]], [["y", 41], ["z", 42]]] + ])")); + ASSERT_TRUE(success); +} + +TEST_P(WriteAndReadInteTest, TestMapSharedShreddingReadAfterRenameColumn) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto map_type = arrow::map(arrow::utf8(), arrow::int64()); + arrow::FieldVector fields_v0 = { + arrow::field("id", arrow::int32()), + arrow::field("metrics", map_type), + }; + std::map options_v0 = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, file_system}, + {"fields.metrics.map.storage-layout", "shared-shredding"}, + {"fields.metrics.map.shared-shredding.max-columns", "2"}, + }; + if (file_system == "jindo") { + options_v0 = AddOptionsForJindo(options_v0); + } + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(test_dir_, arrow::schema(fields_v0), + /*partition_keys=*/{}, /*primary_keys=*/{}, options_v0, + /*is_streaming_mode=*/false)); + + ASSERT_OK_AND_ASSIGN(auto batch_v0, + TestHelper::MakeRecordBatch(arrow::struct_(fields_v0), + R"([ + [1, [["a", 11], ["b", 12]]], + [2, [["c", 21]]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch_v0), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + arrow::FieldVector fields_v1 = { + arrow::field("id", arrow::int32()), + arrow::field("renamed_metrics", map_type), + }; + std::map options_v1 = options_v0; + options_v1.erase("fields.metrics.map.storage-layout"); + options_v1.erase("fields.metrics.map.shared-shredding.max-columns"); + options_v1["fields.renamed_metrics.map.storage-layout"] = "shared-shredding"; + options_v1["fields.renamed_metrics.map.shared-shredding.max-columns"] = "2"; + ASSERT_OK(WriteNextSchema({DataField(0, fields_v1[0]), DataField(1, fields_v1[1])}, + /*highest_field_id=*/1, options_v1)); + + helper.reset(); + ASSERT_OK_AND_ASSIGN(helper, TestHelper::Create(PathUtil::JoinPath(test_dir_, "foo.db/bar"), + options_v1, /*is_streaming_mode=*/false)); + + auto expected_type = arrow::struct_({ + arrow::field("_VALUE_KIND", arrow::int8()), + arrow::field("id", arrow::int32()), + arrow::field("renamed_metrics", map_type), + }); + ASSERT_OK_AND_ASSIGN(auto splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(expected_type, splits, + R"([ + [0, 1, [["a", 11], ["b", 12]]], + [0, 2, [["c", 21]]] + ])")); + ASSERT_TRUE(success); +} + TEST_P(WriteAndReadInteTest, TestSharedShreddingWithSchemaEvolution) { auto [file_format, file_system] = GetParam(); if (file_format != "parquet" && file_format != "orc") { @@ -1585,6 +2077,226 @@ TEST_P(WriteAndReadInteTest, TestSharedShreddingWithStructValue) { ASSERT_TRUE(success); } +TEST_P(WriteAndReadInteTest, TestMapSharedShreddingWithComplexValue) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto value_type = arrow::struct_({ + arrow::field("name", arrow::utf8()), + arrow::field("scores", arrow::list(arrow::int32())), + arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int64())), + }); + auto map_type = arrow::map(arrow::utf8(), value_type); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("tags", map_type), + }; + std::map options = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, file_system}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "1"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(test_dir_, arrow::schema(fields), + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN(auto batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [1, [ + ["a", ["alpha", [1, 2], [["ia", 10], ["ib", 20]]]], + ["z", ["zeta", [9], [["iz", 90]]]] + ]], + [2, [ + ["a", ["amy", null, [["ia", 30]]]], + ["b", ["beta", [], []]] + ]], + [3, null] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + arrow::FieldVector expected_fields = fields; + expected_fields.insert(expected_fields.begin(), arrow::field("_VALUE_KIND", arrow::int8())); + ASSERT_OK_AND_ASSIGN(auto splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(bool full_success, + helper->ReadAndCheckResult(arrow::struct_(expected_fields), splits, + R"([ + [0, 1, [ + ["a", ["alpha", [1, 2], [["ia", 10], ["ib", 20]]]], + ["z", ["zeta", [9], [["iz", 90]]]] + ]], + [0, 2, [ + ["a", ["amy", null, [["ia", 30]]]], + ["b", ["beta", [], []]] + ]], + [0, 3, null] + ])")); + ASSERT_TRUE(full_success); + + auto selected_keys_meta = + arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"z,a"}); + auto read_schema = arrow::schema({ + arrow::field("id", arrow::int32()), + arrow::field("tags", map_type)->WithMetadata(selected_keys_meta), + }); + auto expected_type = arrow::struct_({ + arrow::field("_VALUE_KIND", arrow::int8()), + arrow::field("id", arrow::int32()), + arrow::field("tags", map_type), + }); + ASSERT_OK_AND_ASSIGN(bool selected_success, + ReadAndCheckWithReadSchema(options, read_schema, expected_type, + R"([ + [0, 1, [ + ["z", ["zeta", [9], [["iz", 90]]]], + ["a", ["alpha", [1, 2], [["ia", 10], ["ib", 20]]]] + ]], + [0, 2, [ + ["a", ["amy", null, [["ia", 30]]]] + ]], + [0, 3, null] + ])")); + ASSERT_TRUE(selected_success); +} + +TEST_P(WriteAndReadInteTest, TestMapSharedShreddingStructValueSchemaEvolutionReadFails) { + auto [file_format, file_system] = GetParam(); + if (file_format != "parquet" && file_format != "orc") { + return; + } + + auto tag_value_type = arrow::struct_({ + arrow::field("v", arrow::int64()), + arrow::field("label", arrow::utf8()), + }); + auto profile_type = arrow::struct_({ + arrow::field("name", arrow::utf8()), + arrow::field("score", arrow::int64()), + }); + arrow::FieldVector fields = { + arrow::field("id", arrow::int32()), + arrow::field("tags", arrow::map(arrow::utf8(), tag_value_type)), + arrow::field("profile", profile_type), + }; + std::map options = { + {Options::MANIFEST_FORMAT, "avro"}, + {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "1024"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, file_system}, + {"fields.tags.map.storage-layout", "shared-shredding"}, + {"fields.tags.map.shared-shredding.max-columns", "1"}, + }; + if (file_system == "jindo") { + options = AddOptionsForJindo(options); + } + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(test_dir_, arrow::schema(fields), + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN(auto batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), + R"([ + [1, [["a", [10, "one"]], ["z", [11, "overflow"]]], ["alice", 100]], + [2, [["a", [20, "two"]]], ["bob", 200]] + ])", + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK(helper->WriteAndCommit(std::move(batch), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + arrow::FieldVector expected_fields = fields; + expected_fields.insert(expected_fields.begin(), arrow::field("_VALUE_KIND", arrow::int8())); + ASSERT_OK_AND_ASSIGN(auto splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(bool success, + helper->ReadAndCheckResult(arrow::struct_(expected_fields), splits, + R"([ + [0, 1, [["a", [10, "one"]], ["z", [11, "overflow"]]], ["alice", 100]], + [0, 2, [["a", [20, "two"]]], ["bob", 200]] + ])")); + ASSERT_TRUE(success); + + std::string table_path = PathUtil::JoinPath(test_dir_, "foo.db/bar"); + SchemaManager schema_manager(dir_->GetFileSystem(), table_path); + ASSERT_OK_AND_ASSIGN(auto schema_v0, schema_manager.ReadSchema(0)); + std::vector fields_v0 = schema_v0->Fields(); + + auto read_fields = [&](const std::vector& field_names) -> Status { + PAIMON_ASSIGN_OR_RAISE(auto plan, InnerScan(options)); + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetOptions(options).SetReadFieldNames(field_names); + PAIMON_ASSIGN_OR_RAISE(auto read_context, read_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_read, TableRead::Create(std::move(read_context))); + PAIMON_ASSIGN_OR_RAISE(auto batch_reader, table_read->CreateReader(plan->Splits())); + PAIMON_ASSIGN_OR_RAISE(auto actual, ReadResultCollector::CollectResult(batch_reader.get())); + (void)actual; + return Status::OK(); + }; + + auto tag_field = fields_v0[1].ArrowField(); + auto tag_map = arrow::internal::checked_pointer_cast(tag_field->type()); + auto tag_value_struct = + arrow::internal::checked_pointer_cast(tag_map->item_type()); + + // Simulate alter table changing the shared-shredding MAP value struct field type. + auto changed_tag_value_type = + arrow::map(tag_map->key_type(), tag_map->item_field()->WithType(arrow::struct_({ + tag_value_struct->field(0)->WithType(arrow::utf8()), + tag_value_struct->field(1), + }))); + std::vector fields_with_changed_tag_value = fields_v0; + fields_with_changed_tag_value[1] = + DataField(fields_v0[1].Id(), tag_field->WithType(changed_tag_value_type)); + ASSERT_OK(WriteNextSchema(fields_with_changed_tag_value, schema_v0->HighestFieldId(), options)); + ASSERT_NOK_WITH_MSG(read_fields({"tags"}), + "PruneDataType does not support partial projection inside map: src " + "map> vs target " + "map>"); + + auto profile_field = fields_v0[2].ArrowField(); + auto profile_struct = + arrow::internal::checked_pointer_cast(profile_field->type()); + + // Simulate alter table renaming a nested field inside a STRUCT column. + std::vector fields_with_renamed_profile_child = fields_v0; + auto renamed_profile_type = arrow::struct_({ + profile_struct->field(0)->WithName("renamed_name"), + profile_struct->field(1), + }); + fields_with_renamed_profile_child[2] = + DataField(fields_v0[2].Id(), profile_field->WithType(renamed_profile_type)); + ASSERT_OK( + WriteNextSchema(fields_with_renamed_profile_child, schema_v0->HighestFieldId(), options)); + ASSERT_NOK_WITH_MSG(read_fields({"profile"}), + "name mismatch: read 'renamed_name' vs data 'name'"); + + // Simulate alter table changing a nested field type inside a STRUCT column. + std::vector fields_with_changed_profile_child_type = fields_v0; + auto changed_profile_type = arrow::struct_({ + profile_struct->field(0), + profile_struct->field(1)->WithType(arrow::utf8()), + }); + fields_with_changed_profile_child_type[2] = + DataField(fields_v0[2].Id(), profile_field->WithType(changed_profile_type)); + ASSERT_OK(WriteNextSchema(fields_with_changed_profile_child_type, schema_v0->HighestFieldId(), + options)); + ASSERT_NOK_WITH_MSG(read_fields({"profile"}), + "PruneDataType nested field type mismatch for 'score': read string vs " + "data int64"); +} + // Keep ORC lazy dictionary decoding enabled across a default -> shared-shredding schema change. // The test inspects every user-visible batch directly, because ReadResultCollector would otherwise // decode dictionary arrays and hide a type mismatch between old and new files.