Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/paimon/core/manifest/index_manifest_file_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ std::vector<IndexManifestEntry> IndexManifestFileHandler::BucketedCombiner::Comb
index_entries.erase(entry.first);
}
for (const auto& entry : added) {
index_entries.emplace(entry.first, entry.second);
index_entries.insert_or_assign(entry.first, entry.second);
}

std::vector<IndexManifestEntry> result_entries;
Expand Down
20 changes: 20 additions & 0 deletions src/paimon/core/schema/schema_validation_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,26 @@ TEST(SchemaValidationTest, TestMapStorageLayout) {
ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema),
"not MAP<STRING, T>");
}
// Invalid: nested MAP paths are not shared-shredding columns; only top-level columns are
// addressable by fields.<column>.map.storage-layout.
{
auto payload = arrow::field(
"payload",
arrow::struct_({arrow::field("attrs", arrow::map(arrow::utf8(), arrow::int64()))}));
arrow::FieldVector fields = {f0, f1, payload};
auto schema = arrow::schema(fields);
std::map<std::string, std::string> options = {
{Options::BUCKET, "2"},
{Options::BUCKET_KEY, "f0"},
{"fields.payload.attrs.map.storage-layout", "shared-shredding"}};
ASSERT_OK_AND_ASSIGN(std::shared_ptr<TableSchema> table_schema,
TableSchema::Create(/*schema_id=*/0, schema, /*partition_keys=*/{},
/*primary_keys=*/{"f0", "f1"}, options));
ASSERT_NOK_WITH_MSG(
SchemaValidation::ValidateTableSchema(*table_schema),
"Column 'payload.attrs' is configured with map.storage-layout but does not exist in "
"table schema.");
}
// Valid: default layout on a MAP column
{
arrow::FieldVector fields = {f0, f1, f2};
Expand Down
116 changes: 116 additions & 0 deletions test/inte/append_compaction_inte_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
#include <string>
#include <vector>

#include "arrow/api.h"
#include "arrow/c/bridge.h"
#include "gtest/gtest.h"
#include "paimon/commit_context.h"
#include "paimon/common/data/binary_row.h"
#include "paimon/common/data/shredding/map_shared_shredding_utils.h"
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/scope_guard.h"
#include "paimon/core/append/bucketed_append_compact_manager.h"
#include "paimon/core/io/data_file_meta.h"
Expand All @@ -34,6 +37,7 @@
#include "paimon/executor.h"
#include "paimon/file_store_commit.h"
#include "paimon/file_store_write.h"
#include "paimon/format/file_format_factory.h"
#include "paimon/result.h"
#include "paimon/testing/utils/binary_row_generator.h"
#include "paimon/testing/utils/data_generator.h"
Expand Down Expand Up @@ -245,6 +249,118 @@ TEST_P(AppendCompactionInteTest, TestAppendTableStreamWriteFullCompaction) {
}
}

TEST_P(AppendCompactionInteTest, TestAppendTableStreamWriteFullCompactionWithMapSharedShredding) {
auto file_format = GetParam();
if (file_format != "parquet" && file_format != "orc") {
return;
}

auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);
auto map_type = arrow::map(arrow::utf8(), arrow::int64());
arrow::FieldVector fields = {
arrow::field("id", arrow::int32()),
arrow::field("tags", map_type),
};
auto schema = arrow::schema(fields);

std::map<std::string, std::string> options = {
{Options::FILE_FORMAT, file_format},
{Options::BUCKET, "1"},
{Options::BUCKET_KEY, "id"},
{Options::FILE_SYSTEM, "local"},
{"fields.tags.map.storage-layout", "shared-shredding"},
{"fields.tags.map.shared-shredding.max-columns", "64"},
};
ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{},
/*primary_keys=*/{}, options,
/*is_streaming_mode=*/true));

ASSERT_OK_AND_ASSIGN(auto batch_0,
TestHelper::MakeRecordBatch(arrow::struct_(fields),
R"([
[1, [["a", 10], ["b", 20]]],
[2, [["c", 30]]]
])",
/*partition_map=*/{}, /*bucket=*/0, {}));
int64_t commit_identifier = 0;
ASSERT_OK(helper->WriteAndCommit(std::move(batch_0), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));

ASSERT_OK_AND_ASSIGN(auto batch_1,
TestHelper::MakeRecordBatch(arrow::struct_(fields),
R"([
[3, [["a", 40], ["d", 50]]],
[4, null]
])",
/*partition_map=*/{}, /*bucket=*/0, {}));
ASSERT_OK(helper->WriteAndCommit(std::move(batch_1), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));

ASSERT_OK_AND_ASSIGN(auto batch_2,
TestHelper::MakeRecordBatch(arrow::struct_(fields),
R"([
[5, [["e", 60], ["f", 70], ["g", 80], ["h", 90]]]
])",
/*partition_map=*/{}, /*bucket=*/0, {}));
ASSERT_OK(helper->WriteAndCommit(std::move(batch_2), commit_identifier++,
/*expected_commit_messages=*/std::nullopt));

ASSERT_OK(helper->write_->Compact(/*partition=*/{}, /*bucket=*/0,
/*full_compaction=*/true));
ASSERT_OK_AND_ASSIGN(
std::vector<std::shared_ptr<CommitMessage>> commit_messages,
helper->write_->PrepareCommit(/*wait_compaction=*/true, commit_identifier));
ASSERT_FALSE(commit_messages.empty());
ASSERT_OK(helper->commit_->Commit(commit_messages, commit_identifier));
ASSERT_OK_AND_ASSIGN(std::optional<Snapshot> snapshot, helper->LatestSnapshot());
ASSERT_TRUE(snapshot);
ASSERT_EQ(Snapshot::CommitKind::Compact(), snapshot.value().GetCommitKind());

ASSERT_OK_AND_ASSIGN(std::vector<std::shared_ptr<Split>> data_splits,
helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt));
ASSERT_EQ(data_splits.size(), 1);
{
// check adaptive k
auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(data_splits[0]);
ASSERT_TRUE(data_split);
ASSERT_EQ(data_split->DataFiles().size(), 1);
auto compact_file = data_split->DataFiles()[0];
std::string compact_file_path =
PathUtil::JoinPath(data_split->BucketPath(), compact_file->file_name);
ASSERT_OK_AND_ASSIGN(auto unique_input_stream,
dir->GetFileSystem()->Open(compact_file_path));
std::shared_ptr<InputStream> input_stream(std::move(unique_input_stream));
ASSERT_OK_AND_ASSIGN(auto file_format_obj, FileFormatFactory::Get(file_format, options));
ASSERT_OK_AND_ASSIGN(auto reader_builder, file_format_obj->CreateReaderBuilder(10));
ASSERT_OK_AND_ASSIGN(auto reader, reader_builder->Build(input_stream));
ASSERT_OK_AND_ASSIGN(auto c_file_schema, reader->GetFileSchema());
auto file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie();
auto tags_field = file_schema->GetFieldByName("tags");
ASSERT_TRUE(tags_field);
ASSERT_TRUE(tags_field->metadata());
ASSERT_OK_AND_ASSIGN(
auto tags_meta,
MapSharedShreddingUtils::DeserializeMetadata(
tags_field->metadata()->Copy(), MapSharedShreddingDefine::kDefaultDictCompression));
ASSERT_EQ(4, tags_meta.num_columns);
ASSERT_EQ(4, tags_meta.max_row_width);
}
arrow::FieldVector fields_with_row_kind = fields;
fields_with_row_kind.insert(fields_with_row_kind.begin(),
arrow::field("_VALUE_KIND", arrow::int8()));
auto data_type = arrow::struct_(fields_with_row_kind);
ASSERT_OK_AND_ASSIGN(bool success, helper->ReadAndCheckResult(data_type, data_splits,
R"([
[0, 1, [["a", 10], ["b", 20]]],
[0, 2, [["c", 30]]],
[0, 3, [["a", 40], ["d", 50]]],
[0, 4, null],
[0, 5, [["e", 60], ["f", 70], ["g", 80], ["h", 90]]]
])"));
ASSERT_TRUE(success);
}

TEST_P(AppendCompactionInteTest, TestAppendTableStreamWriteFullCompactionWithDv) {
auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);
Expand Down
154 changes: 152 additions & 2 deletions test/inte/data_evolution_table_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "gtest/gtest.h"
#include "paimon/common/factories/io_hook.h"
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/common/utils/path_util.h"
#include "paimon/common/utils/scope_guard.h"
Expand Down Expand Up @@ -46,9 +47,10 @@ class DataEvolutionTableTest : public ::testing::Test,
dir_.reset();
}

void CreateTable(const std::vector<std::string>& partition_keys,
void CreateTable(const arrow::FieldVector& fields,
const std::vector<std::string>& partition_keys,
const std::map<std::string, std::string>& options) const {
auto schema = arrow::schema(fields_);
auto schema = arrow::schema(fields);
::ArrowSchema c_schema;
ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok());

Expand All @@ -59,6 +61,11 @@ class DataEvolutionTableTest : public ::testing::Test,
/*ignore_if_exists=*/false));
}

void CreateTable(const std::vector<std::string>& partition_keys,
const std::map<std::string, std::string>& options) const {
CreateTable(fields_, partition_keys, options);
}

void CreateTable(const std::vector<std::string>& partition_keys) const {
std::map<std::string, std::string> options = {{Options::MANIFEST_FORMAT, "orc"},
{Options::FILE_FORMAT, GetParam()},
Expand Down Expand Up @@ -510,6 +517,149 @@ TEST_P(DataEvolutionTableTest, TestOnlySomeColumns) {
}
}

TEST_P(DataEvolutionTableTest, TestMultipleSharedShreddingMapsPartialOverwrite) {
if (GetParam() != "parquet" && GetParam() != "orc") {
return;
}

auto map_type = arrow::map(arrow::utf8(), arrow::int64());
arrow::FieldVector fields = {
arrow::field("id", arrow::int32()),
arrow::field("map1", map_type),
arrow::field("map2", map_type),
};
std::map<std::string, std::string> options = {
{Options::MANIFEST_FORMAT, "orc"},
{Options::FILE_FORMAT, GetParam()},
{Options::FILE_SYSTEM, "local"},
{Options::ROW_TRACKING_ENABLED, "true"},
{Options::DATA_EVOLUTION_ENABLED, "true"},
{"fields.map1.map.storage-layout", "shared-shredding"},
{"fields.map1.map.shared-shredding.max-columns", "1"},
{"fields.map2.map.storage-layout", "shared-shredding"},
{"fields.map2.map.shared-shredding.max-columns", "1"},
};
CreateTable(fields, /*partition_keys=*/{}, options);
std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar");
auto schema = arrow::schema(fields);

std::vector<std::string> write_cols0 = {"id", "map1"};
auto src_array0 = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0], fields[1]}), R"([
[1, [["a", 10], ["b", 20]]],
[11, [["a", 11], ["b", 21]]]
])")
.ValueOrDie());
ASSERT_OK_AND_ASSIGN(auto commit_msgs0, WriteArray(table_path, write_cols0, src_array0));
ASSERT_OK(Commit(table_path, commit_msgs0));

std::vector<std::string> write_cols1 = {"id", "map2"};
auto src_array1 = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0], fields[2]}), R"([
[2, [["c", 30], ["d", 40]]],
[12, [["c", 31], ["d", 41]]]
])")
.ValueOrDie());
ASSERT_OK_AND_ASSIGN(auto commit_msgs1, WriteArray(table_path, write_cols1, src_array1));
SetFirstRowId(/*reset_first_row_id=*/0, commit_msgs1);
ASSERT_OK(Commit(table_path, commit_msgs1));

std::vector<std::string> write_cols2 = {"map1"};
auto src_array2 = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[1]}), R"([
[[["b", 200], ["a", 100]]],
[[["b", 201], ["a", 101]]]
])")
.ValueOrDie());
ASSERT_OK_AND_ASSIGN(auto commit_msgs2, WriteArray(table_path, write_cols2, src_array2));
SetFirstRowId(/*reset_first_row_id=*/0, commit_msgs2);
ASSERT_OK(Commit(table_path, commit_msgs2));

// Read all columns and merge values from all partial files.
auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
[2, [["a", 100], ["b", 200]], [["c", 30], ["d", 40]]],
[12, [["a", 101], ["b", 201]], [["c", 31], ["d", 41]]]
])")
.ValueOrDie());
ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_array));

// Read a subset of columns and recall only the requested shared-shredding MAP column.
auto expected_column_pruned_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({fields[0], fields[2]}), R"([
[2, [["c", 30], ["d", 40]]],
[12, [["c", 31], ["d", 41]]]
])")
.ValueOrDie());
ASSERT_OK(ScanAndRead(table_path, {"id", "map2"}, expected_column_pruned_array));

// Read selected keys from both shared-shredding MAP columns after partial overwrite merge.
{
auto map1_selected_keys =
arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"b"});
auto map2_selected_keys =
arrow::KeyValueMetadata::Make({DataField::MAP_SELECTED_KEYS}, {"d"});
auto read_schema = arrow::schema({
fields[0],
fields[1]->WithMetadata(map1_selected_keys),
fields[2]->WithMetadata(map2_selected_keys),
});
auto c_schema = std::make_unique<ArrowSchema>();
ASSERT_TRUE(arrow::ExportSchema(*read_schema, c_schema.get()).ok());

ScanContextBuilder scan_context_builder(table_path);
ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context)));
ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan());

ReadContextBuilder read_context_builder(table_path);
read_context_builder.SetReadSchema(std::move(c_schema));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<ReadContext> read_context,
read_context_builder.Finish());
ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context)));
ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits()));
ASSERT_OK_AND_ASSIGN(auto actual, ReadResultCollector::CollectResult(batch_reader.get()));

auto expected_type = arrow::struct_({
SpecialFields::ValueKind().field_,
fields[0],
fields[1],
fields[2],
});
auto expected = arrow::ipc::internal::json::ArrayFromJSON(expected_type, R"([
[0, 2, [["b", 200]], [["d", 40]]],
[0, 12, [["b", 201]], [["d", 41]]]
])")
.ValueOrDie();
auto expected_chunked = std::make_shared<arrow::ChunkedArray>(expected);
ASSERT_TRUE(expected_chunked->Equals(actual))
<< "actual=" << actual->ToString() << "\nexpected=" << expected_chunked->ToString();
}

// Read a subset of rows after merging values from all partial files.
auto expected_partial_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([
[12, [["a", 101], ["b", 201]], [["c", 31], ["d", 41]]]
])")
.ValueOrDie());
ASSERT_OK(ScanAndRead(table_path, schema->field_names(), expected_partial_array,
/*predicate=*/nullptr,
/*row_ranges=*/{Range(1l, 1l)}));

// Read row tracking fields and verify the latest partial overwrite sequence number.
auto expected_row_tracking_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({fields[0], fields[1], fields[2], SpecialFields::RowId().field_,
SpecialFields::SequenceNumber().field_}),
R"([
[2, [["a", 100], ["b", 200]], [["c", 30], ["d", 40]], 0, 3],
[12, [["a", 101], ["b", 201]], [["c", 31], ["d", 41]], 1, 3]
])")
.ValueOrDie());
ASSERT_OK(ScanAndRead(table_path, {"id", "map1", "map2", "_ROW_ID", "_SEQUENCE_NUMBER"},
expected_row_tracking_array));
}

TEST_P(DataEvolutionTableTest, TestNullValues) {
CreateTable();
std::string table_path = PathUtil::JoinPath(dir_->Str(), "foo.db/bar");
Expand Down
Loading
Loading