diff --git a/src/paimon/format/parquet/CMakeLists.txt b/src/paimon/format/parquet/CMakeLists.txt index 6e72e0c69..d35c8190f 100644 --- a/src/paimon/format/parquet/CMakeLists.txt +++ b/src/paimon/format/parquet/CMakeLists.txt @@ -17,7 +17,7 @@ set(PAIMON_PARQUET_FILE_FORMAT predicate_converter.cpp file_reader_wrapper.cpp page_filtered_row_group_reader.cpp - parquet_timestamp_converter.cpp + parquet_timestamp_binary_converter.cpp parquet_file_batch_reader.cpp parquet_file_format_factory.cpp parquet_format_writer.cpp @@ -53,7 +53,7 @@ if(PAIMON_BUILD_TESTS) SOURCES file_reader_wrapper_test.cpp page_filtered_row_group_reader_test.cpp - parquet_timestamp_converter_test.cpp + parquet_timestamp_binary_converter_test.cpp parquet_field_id_converter_test.cpp parquet_file_batch_reader_test.cpp parquet_format_writer_test.cpp diff --git a/src/paimon/format/parquet/parquet_file_batch_reader.cpp b/src/paimon/format/parquet/parquet_file_batch_reader.cpp index 6c1b3f892..bf11f58de 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader.cpp @@ -42,7 +42,7 @@ #include "paimon/core/schema/arrow_schema_validator.h" #include "paimon/format/parquet/parquet_field_id_converter.h" #include "paimon/format/parquet/parquet_format_defs.h" -#include "paimon/format/parquet/parquet_timestamp_converter.h" +#include "paimon/format/parquet/parquet_timestamp_binary_converter.h" #include "paimon/format/parquet/predicate_converter.h" #include "paimon/reader/batch_reader.h" #include "paimon/utils/roaring_bitmap32.h" @@ -111,7 +111,7 @@ Result> ParquetFileBatchReader::GetFileSchema() c ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(file_schema)); PAIMON_ASSIGN_OR_RAISE( std::shared_ptr new_type, - ParquetTimestampConverter::AdjustTimezone(arrow::struct_(new_schema->fields()))); + ParquetTimestampBinaryConverter::AdjustTimezone(arrow::struct_(new_schema->fields()))); auto c_schema = std::make_unique<::ArrowSchema>(); PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*new_type, c_schema.get())); @@ -348,14 +348,18 @@ Result ParquetFileBatchReader::NextBatch() { } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, batch->ToStructArray()); - PAIMON_ASSIGN_OR_RAISE(bool need_cast, ParquetTimestampConverter::NeedCastArrayForTimestamp( - array->type(), read_data_type_)); + // Reconcile the read array to the read schema in one traversal: timestamp timezone, and + // inline blob descriptors read as BINARY widened to the LARGE_BINARY read type. + PAIMON_ASSIGN_OR_RAISE(bool need_cast, + ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( + array->type(), read_data_type_)); if (need_cast) { - PAIMON_ASSIGN_OR_RAISE(array, ParquetTimestampConverter::CastArrayForTimestamp( + PAIMON_ASSIGN_OR_RAISE(array, ParquetTimestampBinaryConverter::CastArrayForTimestamp( array, read_data_type_, arrow_pool_)); } - PAIMON_ASSIGN_OR_RAISE(need_cast, ParquetTimestampConverter::NeedCastArrayForTimestamp( - array->type(), read_data_type_)); + PAIMON_ASSIGN_OR_RAISE(need_cast, + ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( + array->type(), read_data_type_)); if (need_cast) { return Status::Invalid(fmt::format( "unexpected: in parquet, after CastArrayForTimestamp, output type {} not " diff --git a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp index 7fa2da624..124dcc05c 100644 --- a/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp +++ b/src/paimon/format/parquet/parquet_file_batch_reader_test.cpp @@ -326,6 +326,48 @@ TEST_F(ParquetFileBatchReaderTest, TestReadBinaryWrittenFromBinaryAndLargeBinary check_binary_read_result(arrow::large_binary(), "large-binary.parquet"); } +// Inline blob descriptors (blob-descriptor-field) are written to parquet as BINARY, but the +// blob column is read back through a LARGE_BINARY read schema. The reader must widen +// BINARY -> LARGE_BINARY for that column -- leaving non-blob columns untouched -- instead of +// rejecting the type difference in the timestamp type-equality check. +TEST_F(ParquetFileBatchReaderTest, TestReadBinaryColumnWithLargeBinaryReadSchema) { + std::string data_json = R"([ + [1, "descriptor-1"], + [2, ""], + [3, null], + [4, "descriptor-2"] + ])"; + auto id_field = arrow::field("id", arrow::int32()); + auto photo_binary_field = arrow::field("photo", arrow::binary()); + auto write_schema = arrow::schema({id_field, photo_binary_field}); + auto write_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({id_field, photo_binary_field}), + data_json) + .ValueOrDie()); + + std::string file_path = PathUtil::JoinPath(dir_->Str(), "binary-to-large-binary.parquet"); + WriteArray(file_path, write_array, write_schema, /*write_batch_size=*/write_array->length(), + /*enable_dictionary=*/false, /*max_row_group_length=*/write_array->length()); + + // Read schema asks for LARGE_BINARY on the blob column; the non-blob `id` stays int32. + auto photo_large_binary_field = arrow::field("photo", arrow::large_binary()); + auto read_schema = arrow::schema({id_field, photo_large_binary_field}); + auto parquet_batch_reader = + PrepareParquetFileBatchReader(file_path, read_schema, /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt, batch_size_); + + ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult( + parquet_batch_reader.get())); + + // The blob column is now LARGE_BINARY with the original values; `id` is unchanged. + auto expected_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON( + arrow::struct_({id_field, photo_large_binary_field}), data_json) + .ValueOrDie()); + auto expected_chunked_array = std::make_shared(expected_array); + ASSERT_TRUE(result_array->Equals(expected_chunked_array)); +} + TEST_F(ParquetFileBatchReaderTest, TestSimple) { std::string file_name = paimon::test::GetDataDir() + "/parquet/parquet_append_table.db/parquet_append_table/bucket-0/" diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.cpp b/src/paimon/format/parquet/parquet_timestamp_binary_converter.cpp similarity index 82% rename from src/paimon/format/parquet/parquet_timestamp_converter.cpp rename to src/paimon/format/parquet/parquet_timestamp_binary_converter.cpp index f12ad4cc7..eb49debc6 100644 --- a/src/paimon/format/parquet/parquet_timestamp_converter.cpp +++ b/src/paimon/format/parquet/parquet_timestamp_binary_converter.cpp @@ -14,12 +14,16 @@ * limitations under the License. */ -#include "paimon/format/parquet/parquet_timestamp_converter.h" +#include "paimon/format/parquet/parquet_timestamp_binary_converter.h" +#include #include #include #include +#include "arrow/array/array_binary.h" +#include "arrow/array/util.h" +#include "arrow/buffer.h" #include "arrow/type.h" #include "fmt/format.h" #include "paimon/common/utils/arrow/status_utils.h" @@ -27,7 +31,35 @@ #include "paimon/core/casting/timestamp_to_timestamp_cast_executor.h" namespace paimon::parquet { -Result> ParquetTimestampConverter::AdjustTimezone( + +namespace { + +// Widen a BINARY array to LARGE_BINARY: rebuild the 32-bit offsets as int64 and reuse the +// value/null buffers (only the offset buffer is newly allocated, no value data is copied). +Result> WidenBinaryToLargeBinary( + const std::shared_ptr& binary_array, arrow::MemoryPool* pool) { + if (binary_array->offset() != 0) { + return Status::Invalid("WidenBinaryToLargeBinary only supports zero-offset arrays"); + } + const int64_t length = binary_array->length(); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr large_offsets, + arrow::AllocateBuffer((length + 1) * static_cast(sizeof(int64_t)), pool)); + auto* out = reinterpret_cast(large_offsets->mutable_data()); + for (int64_t i = 0; i <= length; ++i) { + out[i] = binary_array->value_offset(i); + } + std::shared_ptr null_bitmap = + binary_array->null_count() == 0 ? nullptr : binary_array->null_bitmap(); + auto data = arrow::ArrayData::Make(arrow::large_binary(), length, + {null_bitmap, large_offsets, binary_array->value_data()}, + binary_array->null_count()); + return arrow::MakeArray(data); +} + +} // namespace + +Result> ParquetTimestampBinaryConverter::AdjustTimezone( const std::shared_ptr& src_data_type) { arrow::Type::type type = src_data_type->id(); switch (type) { @@ -73,11 +105,17 @@ Result> ParquetTimestampConverter::AdjustTimezo } } -Result ParquetTimestampConverter::NeedCastArrayForTimestamp( +Result ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( const std::shared_ptr& src_data_type, const std::shared_ptr& target_data_type) { arrow::Type::type type = src_data_type->id(); if (type != target_data_type->id()) { + // Inline blob descriptors are read as parquet BINARY while the BLOB column's read type is + // LARGE_BINARY; that is a legal widening (handled by CastArrayForTimestamp), not a mismatch. + if (type == arrow::Type::type::BINARY && + target_data_type->id() == arrow::Type::type::LARGE_BINARY) { + return true; + } return Status::Invalid(fmt::format("src type {} and target type {} mismatch", src_data_type->ToString(), target_data_type->ToString())); @@ -145,11 +183,17 @@ Result ParquetTimestampConverter::NeedCastArrayForTimestamp( } } -Result> ParquetTimestampConverter::CastArrayForTimestamp( +Result> ParquetTimestampBinaryConverter::CastArrayForTimestamp( const std::shared_ptr& array, const std::shared_ptr& target_data_type, const std::shared_ptr& arrow_pool) { arrow::Type::type type = array->type()->id(); + // Inline blob descriptor column: read as BINARY, widen to the LARGE_BINARY read type. + if (type == arrow::Type::type::BINARY && + target_data_type->id() == arrow::Type::type::LARGE_BINARY) { + return WidenBinaryToLargeBinary(std::static_pointer_cast(array), + arrow_pool.get()); + } switch (type) { case arrow::Type::type::STRUCT: { auto* struct_array = arrow::internal::checked_cast(array.get()); diff --git a/src/paimon/format/parquet/parquet_timestamp_converter.h b/src/paimon/format/parquet/parquet_timestamp_binary_converter.h similarity index 75% rename from src/paimon/format/parquet/parquet_timestamp_converter.h rename to src/paimon/format/parquet/parquet_timestamp_binary_converter.h index b1283adcb..54b930fd7 100644 --- a/src/paimon/format/parquet/parquet_timestamp_converter.h +++ b/src/paimon/format/parquet/parquet_timestamp_binary_converter.h @@ -23,10 +23,14 @@ namespace paimon::parquet { -class ParquetTimestampConverter { +// Reconciles a parquet-read array with the read schema for the cases where the parquet physical +// type legally differs from the read type, in a single per-batch traversal: +// - timestamp timezone/unit; +// - inline blob descriptors, stored as parquet BINARY but read as LARGE_BINARY (BLOB column). +class ParquetTimestampBinaryConverter { public: - ParquetTimestampConverter() = delete; - ~ParquetTimestampConverter() = delete; + ParquetTimestampBinaryConverter() = delete; + ~ParquetTimestampBinaryConverter() = delete; static Result> AdjustTimezone( const std::shared_ptr& src_data_type); diff --git a/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp b/src/paimon/format/parquet/parquet_timestamp_binary_converter_test.cpp similarity index 90% rename from src/paimon/format/parquet/parquet_timestamp_converter_test.cpp rename to src/paimon/format/parquet/parquet_timestamp_binary_converter_test.cpp index 7490ddfbc..617343022 100644 --- a/src/paimon/format/parquet/parquet_timestamp_converter_test.cpp +++ b/src/paimon/format/parquet/parquet_timestamp_binary_converter_test.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "paimon/format/parquet/parquet_timestamp_converter.h" +#include "paimon/format/parquet/parquet_timestamp_binary_converter.h" #include @@ -28,7 +28,7 @@ namespace paimon::parquet::test { -TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) { +TEST(ParquetTimestampBinaryConverterTest, TestNeedCastArrayForTimestamp) { { // single field need cast arrow::FieldVector fields = { @@ -38,7 +38,7 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) { arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO, "UTC")), }; ASSERT_OK_AND_ASSIGN(bool need_cast, - ParquetTimestampConverter::NeedCastArrayForTimestamp( + ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( arrow::struct_(fields), arrow::struct_(target_fields))); ASSERT_TRUE(need_cast); } @@ -50,7 +50,7 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) { arrow::FieldVector target_fields = { arrow::field("f2", arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)))}; ASSERT_OK_AND_ASSIGN(bool need_cast, - ParquetTimestampConverter::NeedCastArrayForTimestamp( + ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( arrow::struct_(fields), arrow::struct_(target_fields))); ASSERT_TRUE(need_cast); } @@ -64,7 +64,7 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) { arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::SECOND), arrow::timestamp(arrow::TimeUnit::NANO, "UTC")))}; ASSERT_OK_AND_ASSIGN(bool need_cast, - ParquetTimestampConverter::NeedCastArrayForTimestamp( + ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( arrow::struct_(fields), arrow::struct_(target_fields))); ASSERT_TRUE(need_cast); } @@ -82,13 +82,13 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) { arrow::field("f1", arrow::timestamp(arrow::TimeUnit::NANO, "UTC"))})), }; ASSERT_OK_AND_ASSIGN(bool need_cast, - ParquetTimestampConverter::NeedCastArrayForTimestamp( + ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp( arrow::struct_(fields), arrow::struct_(target_fields))); ASSERT_TRUE(need_cast); } } -TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) { +TEST(ParquetTimestampBinaryConverterTest, TestCastArrayForTimestamp) { auto timezone = DateTimeUtils::GetLocalTimezoneName(); arrow::FieldVector fields = { arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI), @@ -121,7 +121,7 @@ TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) { std::shared_ptr pool = GetArrowPool(GetDefaultPool()); ASSERT_OK_AND_ASSIGN(std::shared_ptr result_array, - ParquetTimestampConverter::CastArrayForTimestamp( + ParquetTimestampBinaryConverter::CastArrayForTimestamp( array, arrow::struct_(target_fields), pool)); auto expected_array = std::dynamic_pointer_cast( @@ -134,7 +134,7 @@ TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) { ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); } -TEST(ParquetTimestampConverterTest, TestAdjustTimezone) { +TEST(ParquetTimestampBinaryConverterTest, TestAdjustTimezone) { auto timezone = DateTimeUtils::GetLocalTimezoneName(); arrow::FieldVector fields = { arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI), @@ -159,7 +159,7 @@ TEST(ParquetTimestampConverterTest, TestAdjustTimezone) { }; ASSERT_OK_AND_ASSIGN(auto result_type, - ParquetTimestampConverter::AdjustTimezone(arrow::struct_(fields))); + ParquetTimestampBinaryConverter::AdjustTimezone(arrow::struct_(fields))); ASSERT_TRUE(result_type->Equals(arrow::struct_(target_fields))); } } // namespace paimon::parquet::test