Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/paimon/format/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ set(PAIMON_PARQUET_FILE_FORMAT
predicate_converter.cpp
file_reader_wrapper.cpp
page_filtered_row_group_reader.cpp
parquet_timestamp_converter.cpp
parquet_timestamp_binary_converter.cpp
parquet_file_batch_reader.cpp
parquet_file_format_factory.cpp
parquet_format_writer.cpp
Expand Down Expand Up @@ -53,7 +53,7 @@ if(PAIMON_BUILD_TESTS)
SOURCES
file_reader_wrapper_test.cpp
page_filtered_row_group_reader_test.cpp
parquet_timestamp_converter_test.cpp
parquet_timestamp_binary_converter_test.cpp
parquet_field_id_converter_test.cpp
parquet_file_batch_reader_test.cpp
parquet_format_writer_test.cpp
Expand Down
18 changes: 11 additions & 7 deletions src/paimon/format/parquet/parquet_file_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
#include "paimon/core/schema/arrow_schema_validator.h"
#include "paimon/format/parquet/parquet_field_id_converter.h"
#include "paimon/format/parquet/parquet_format_defs.h"
#include "paimon/format/parquet/parquet_timestamp_converter.h"
#include "paimon/format/parquet/parquet_timestamp_binary_converter.h"
#include "paimon/format/parquet/predicate_converter.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/utils/roaring_bitmap32.h"
Expand Down Expand Up @@ -111,7 +111,7 @@ Result<std::unique_ptr<::ArrowSchema>> ParquetFileBatchReader::GetFileSchema() c
ParquetFieldIdConverter::GetPaimonIdsFromParquetIds(file_schema));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::DataType> new_type,
ParquetTimestampConverter::AdjustTimezone(arrow::struct_(new_schema->fields())));
ParquetTimestampBinaryConverter::AdjustTimezone(arrow::struct_(new_schema->fields())));

auto c_schema = std::make_unique<::ArrowSchema>();
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*new_type, c_schema.get()));
Expand Down Expand Up @@ -348,14 +348,18 @@ Result<BatchReader::ReadBatch> ParquetFileBatchReader::NextBatch() {
}
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> array,
batch->ToStructArray());
PAIMON_ASSIGN_OR_RAISE(bool need_cast, ParquetTimestampConverter::NeedCastArrayForTimestamp(
array->type(), read_data_type_));
// Reconcile the read array to the read schema in one traversal: timestamp timezone, and
// inline blob descriptors read as BINARY widened to the LARGE_BINARY read type.
PAIMON_ASSIGN_OR_RAISE(bool need_cast,
ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
array->type(), read_data_type_));
if (need_cast) {
PAIMON_ASSIGN_OR_RAISE(array, ParquetTimestampConverter::CastArrayForTimestamp(
PAIMON_ASSIGN_OR_RAISE(array, ParquetTimestampBinaryConverter::CastArrayForTimestamp(
array, read_data_type_, arrow_pool_));
}
PAIMON_ASSIGN_OR_RAISE(need_cast, ParquetTimestampConverter::NeedCastArrayForTimestamp(
array->type(), read_data_type_));
PAIMON_ASSIGN_OR_RAISE(need_cast,
ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
array->type(), read_data_type_));
if (need_cast) {
return Status::Invalid(fmt::format(
"unexpected: in parquet, after CastArrayForTimestamp, output type {} not "
Expand Down
42 changes: 42 additions & 0 deletions src/paimon/format/parquet/parquet_file_batch_reader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,48 @@ TEST_F(ParquetFileBatchReaderTest, TestReadBinaryWrittenFromBinaryAndLargeBinary
check_binary_read_result(arrow::large_binary(), "large-binary.parquet");
}

// Inline blob descriptors (blob-descriptor-field) are written to parquet as BINARY, but the
// blob column is read back through a LARGE_BINARY read schema. The reader must widen
// BINARY -> LARGE_BINARY for that column -- leaving non-blob columns untouched -- instead of
// rejecting the type difference in the timestamp type-equality check.
TEST_F(ParquetFileBatchReaderTest, TestReadBinaryColumnWithLargeBinaryReadSchema) {
std::string data_json = R"([
[1, "descriptor-1"],
[2, ""],
[3, null],
[4, "descriptor-2"]
])";
auto id_field = arrow::field("id", arrow::int32());
auto photo_binary_field = arrow::field("photo", arrow::binary());
auto write_schema = arrow::schema({id_field, photo_binary_field});
auto write_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({id_field, photo_binary_field}),
data_json)
.ValueOrDie());

std::string file_path = PathUtil::JoinPath(dir_->Str(), "binary-to-large-binary.parquet");
WriteArray(file_path, write_array, write_schema, /*write_batch_size=*/write_array->length(),
/*enable_dictionary=*/false, /*max_row_group_length=*/write_array->length());

// Read schema asks for LARGE_BINARY on the blob column; the non-blob `id` stays int32.
auto photo_large_binary_field = arrow::field("photo", arrow::large_binary());
auto read_schema = arrow::schema({id_field, photo_large_binary_field});
auto parquet_batch_reader =
PrepareParquetFileBatchReader(file_path, read_schema, /*predicate=*/nullptr,
/*selection_bitmap=*/std::nullopt, batch_size_);

ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult(
parquet_batch_reader.get()));

// The blob column is now LARGE_BINARY with the original values; `id` is unchanged.
auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
arrow::ipc::internal::json::ArrayFromJSON(
arrow::struct_({id_field, photo_large_binary_field}), data_json)
.ValueOrDie());
auto expected_chunked_array = std::make_shared<arrow::ChunkedArray>(expected_array);
ASSERT_TRUE(result_array->Equals(expected_chunked_array));
}

TEST_F(ParquetFileBatchReaderTest, TestSimple) {
std::string file_name = paimon::test::GetDataDir() +
"/parquet/parquet_append_table.db/parquet_append_table/bucket-0/"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,52 @@
* limitations under the License.
*/

#include "paimon/format/parquet/parquet_timestamp_converter.h"
#include "paimon/format/parquet/parquet_timestamp_binary_converter.h"

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "arrow/array/array_binary.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/type.h"
#include "fmt/format.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/common/utils/date_time_utils.h"
#include "paimon/core/casting/timestamp_to_timestamp_cast_executor.h"

namespace paimon::parquet {
Result<std::shared_ptr<arrow::DataType>> ParquetTimestampConverter::AdjustTimezone(

namespace {

// Widen a BINARY array to LARGE_BINARY: rebuild the 32-bit offsets as int64 and reuse the
// value/null buffers (only the offset buffer is newly allocated, no value data is copied).
Result<std::shared_ptr<arrow::Array>> WidenBinaryToLargeBinary(
const std::shared_ptr<arrow::BinaryArray>& binary_array, arrow::MemoryPool* pool) {
if (binary_array->offset() != 0) {
return Status::Invalid("WidenBinaryToLargeBinary only supports zero-offset arrays");
}
const int64_t length = binary_array->length();
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
std::shared_ptr<arrow::Buffer> large_offsets,
arrow::AllocateBuffer((length + 1) * static_cast<int64_t>(sizeof(int64_t)), pool));
auto* out = reinterpret_cast<int64_t*>(large_offsets->mutable_data());
for (int64_t i = 0; i <= length; ++i) {
out[i] = binary_array->value_offset(i);
}
std::shared_ptr<arrow::Buffer> null_bitmap =
binary_array->null_count() == 0 ? nullptr : binary_array->null_bitmap();
auto data = arrow::ArrayData::Make(arrow::large_binary(), length,
{null_bitmap, large_offsets, binary_array->value_data()},
binary_array->null_count());
return arrow::MakeArray(data);
}

} // namespace

Result<std::shared_ptr<arrow::DataType>> ParquetTimestampBinaryConverter::AdjustTimezone(
const std::shared_ptr<arrow::DataType>& src_data_type) {
arrow::Type::type type = src_data_type->id();
switch (type) {
Expand Down Expand Up @@ -73,11 +105,17 @@ Result<std::shared_ptr<arrow::DataType>> ParquetTimestampConverter::AdjustTimezo
}
}

Result<bool> ParquetTimestampConverter::NeedCastArrayForTimestamp(
Result<bool> ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
const std::shared_ptr<arrow::DataType>& src_data_type,
const std::shared_ptr<arrow::DataType>& target_data_type) {
arrow::Type::type type = src_data_type->id();
if (type != target_data_type->id()) {
// Inline blob descriptors are read as parquet BINARY while the BLOB column's read type is
// LARGE_BINARY; that is a legal widening (handled by CastArrayForTimestamp), not a mismatch.
if (type == arrow::Type::type::BINARY &&
target_data_type->id() == arrow::Type::type::LARGE_BINARY) {
return true;
}
return Status::Invalid(fmt::format("src type {} and target type {} mismatch",
src_data_type->ToString(),
target_data_type->ToString()));
Expand Down Expand Up @@ -145,11 +183,17 @@ Result<bool> ParquetTimestampConverter::NeedCastArrayForTimestamp(
}
}

Result<std::shared_ptr<arrow::Array>> ParquetTimestampConverter::CastArrayForTimestamp(
Result<std::shared_ptr<arrow::Array>> ParquetTimestampBinaryConverter::CastArrayForTimestamp(
const std::shared_ptr<arrow::Array>& array,
const std::shared_ptr<arrow::DataType>& target_data_type,
const std::shared_ptr<arrow::MemoryPool>& arrow_pool) {
arrow::Type::type type = array->type()->id();
// Inline blob descriptor column: read as BINARY, widen to the LARGE_BINARY read type.
if (type == arrow::Type::type::BINARY &&
target_data_type->id() == arrow::Type::type::LARGE_BINARY) {
return WidenBinaryToLargeBinary(std::static_pointer_cast<arrow::BinaryArray>(array),
arrow_pool.get());
}
switch (type) {
case arrow::Type::type::STRUCT: {
auto* struct_array = arrow::internal::checked_cast<arrow::StructArray*>(array.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@

namespace paimon::parquet {

class ParquetTimestampConverter {
// Reconciles a parquet-read array with the read schema for the cases where the parquet physical
// type legally differs from the read type, in a single per-batch traversal:
// - timestamp timezone/unit;
// - inline blob descriptors, stored as parquet BINARY but read as LARGE_BINARY (BLOB column).
class ParquetTimestampBinaryConverter {
public:
ParquetTimestampConverter() = delete;
~ParquetTimestampConverter() = delete;
ParquetTimestampBinaryConverter() = delete;
~ParquetTimestampBinaryConverter() = delete;

static Result<std::shared_ptr<arrow::DataType>> AdjustTimezone(
const std::shared_ptr<arrow::DataType>& src_data_type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "paimon/format/parquet/parquet_timestamp_converter.h"
#include "paimon/format/parquet/parquet_timestamp_binary_converter.h"

#include <memory>

Expand All @@ -28,7 +28,7 @@

namespace paimon::parquet::test {

TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
TEST(ParquetTimestampBinaryConverterTest, TestNeedCastArrayForTimestamp) {
{
// single field need cast
arrow::FieldVector fields = {
Expand All @@ -38,7 +38,7 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO, "UTC")),
};
ASSERT_OK_AND_ASSIGN(bool need_cast,
ParquetTimestampConverter::NeedCastArrayForTimestamp(
ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
arrow::struct_(fields), arrow::struct_(target_fields)));
ASSERT_TRUE(need_cast);
}
Expand All @@ -50,7 +50,7 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
arrow::FieldVector target_fields = {
arrow::field("f2", arrow::list(arrow::timestamp(arrow::TimeUnit::SECOND)))};
ASSERT_OK_AND_ASSIGN(bool need_cast,
ParquetTimestampConverter::NeedCastArrayForTimestamp(
ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
arrow::struct_(fields), arrow::struct_(target_fields)));
ASSERT_TRUE(need_cast);
}
Expand All @@ -64,7 +64,7 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::SECOND),
arrow::timestamp(arrow::TimeUnit::NANO, "UTC")))};
ASSERT_OK_AND_ASSIGN(bool need_cast,
ParquetTimestampConverter::NeedCastArrayForTimestamp(
ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
arrow::struct_(fields), arrow::struct_(target_fields)));
ASSERT_TRUE(need_cast);
}
Expand All @@ -82,13 +82,13 @@ TEST(ParquetTimestampConverterTest, TestNeedCastArrayForTimestamp) {
arrow::field("f1", arrow::timestamp(arrow::TimeUnit::NANO, "UTC"))})),
};
ASSERT_OK_AND_ASSIGN(bool need_cast,
ParquetTimestampConverter::NeedCastArrayForTimestamp(
ParquetTimestampBinaryConverter::NeedCastArrayForTimestamp(
arrow::struct_(fields), arrow::struct_(target_fields)));
ASSERT_TRUE(need_cast);
}
}

TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) {
TEST(ParquetTimestampBinaryConverterTest, TestCastArrayForTimestamp) {
auto timezone = DateTimeUtils::GetLocalTimezoneName();
arrow::FieldVector fields = {
arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
Expand Down Expand Up @@ -121,7 +121,7 @@ TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) {

std::shared_ptr<arrow::MemoryPool> pool = GetArrowPool(GetDefaultPool());
ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::Array> result_array,
ParquetTimestampConverter::CastArrayForTimestamp(
ParquetTimestampBinaryConverter::CastArrayForTimestamp(
array, arrow::struct_(target_fields), pool));

auto expected_array = std::dynamic_pointer_cast<arrow::StructArray>(
Expand All @@ -134,7 +134,7 @@ TEST(ParquetTimestampConverterTest, TestCastArrayForTimestamp) {
ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString();
}

TEST(ParquetTimestampConverterTest, TestAdjustTimezone) {
TEST(ParquetTimestampBinaryConverterTest, TestAdjustTimezone) {
auto timezone = DateTimeUtils::GetLocalTimezoneName();
arrow::FieldVector fields = {
arrow::field("f1", arrow::map(arrow::timestamp(arrow::TimeUnit::MILLI),
Expand All @@ -159,7 +159,7 @@ TEST(ParquetTimestampConverterTest, TestAdjustTimezone) {
};

ASSERT_OK_AND_ASSIGN(auto result_type,
ParquetTimestampConverter::AdjustTimezone(arrow::struct_(fields)));
ParquetTimestampBinaryConverter::AdjustTimezone(arrow::struct_(fields)));
ASSERT_TRUE(result_type->Equals(arrow::struct_(target_fields)));
}
} // namespace paimon::parquet::test
Loading