Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion cpp/src/parquet/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,26 @@ class PlainFLBADecoder : public PlainDecoder<FLBAType>, public FLBADecoder {
public:
using Base = PlainDecoder<FLBAType>;
using Base::PlainDecoder;
using Base::Decode; // keep Decode(FixedLenByteArray*, int)

// PLAIN-encoded FLBA values are already contiguous in the page buffer, so
// decode them with a single memcpy into the caller's buffer. This is the same
// copy used by PlainDecoder<FLBAType>::DecodeArrow, without the builder.
int Decode(uint8_t* buffer, int max_values) override {
max_values = std::min(max_values, this->num_values_);
const int64_t bytes_to_decode =
static_cast<int64_t>(this->type_length_) * max_values;
if (bytes_to_decode > this->len_ || bytes_to_decode > INT_MAX) {
ParquetException::EofException();
}
if (bytes_to_decode > 0) {
memcpy(buffer, this->data_, static_cast<size_t>(bytes_to_decode));
}
this->data_ += bytes_to_decode;
this->len_ -= static_cast<int>(bytes_to_decode);
this->num_values_ -= max_values;
return max_values;
}
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -1212,6 +1232,35 @@ int DictDecoderImpl<FLBAType>::DecodeArrow(
return num_values - null_count;
}

// Dictionary decoder for FIXED_LEN_BYTE_ARRAY that can decode directly into a
// caller-owned, densely packed byte buffer. DictDecoderImpl<FLBAType> on its own
// does not inherit FLBADecoder, so this thin subclass adds the dense Decode
// overload (mirroring the DeltaByteArray and ByteStreamSplit FLBA decoders).
class DictFLBADecoder : public DictDecoderImpl<FLBAType>, public FLBADecoder {
public:
using Base = DictDecoderImpl<FLBAType>;
using Base::DictDecoderImpl;
using Base::Decode; // keep Decode(FixedLenByteArray*, int)

// Read one index per value and copy that dictionary entry's type_length bytes
// contiguously into the caller's buffer. Mirrors DecodeArrow without nulls.
int Decode(uint8_t* buffer, int max_values) override {
max_values = std::min(max_values, this->num_values_);
const auto* dict_values = this->dictionary_->data_as<FLBA>();
const int64_t type_length = this->type_length_;
for (int i = 0; i < max_values; ++i) {
int32_t index;
if (ARROW_PREDICT_FALSE(!this->idx_decoder_.Get(&index))) {
throw ParquetException("Dict decoding failed");
}
PARQUET_THROW_NOT_OK(this->IndexInBounds(index));
memcpy(buffer + i * type_length, dict_values[index].ptr,
static_cast<size_t>(type_length));
}
this->num_values_ -= max_values;
return max_values;
}
};
template <typename Type>
int DictDecoderImpl<Type>::DecodeArrow(
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
Expand Down Expand Up @@ -2232,6 +2281,23 @@ class DeltaByteArrayFLBADecoder : public DeltaByteArrayDecoderImpl<FLBAType>,
}
return decoded_values_size;
}

// Same internal decode as above, but copy the bytes contiguously into the
// caller's buffer instead of materializing per-value pointers.
int Decode(uint8_t* buffer, int max_values) override {
std::vector<ByteArray> decode_byte_array(max_values);
const int decoded_values_size = GetInternal(decode_byte_array.data(), max_values);
const uint32_t type_length = static_cast<uint32_t>(this->type_length_);

for (int i = 0; i < decoded_values_size; i++) {
if (ARROW_PREDICT_FALSE(decode_byte_array[i].len != type_length)) {
throw ParquetException("Fixed length byte array length mismatch");
}
memcpy(buffer + static_cast<int64_t>(i) * type_length, decode_byte_array[i].ptr,
type_length);
}
return decoded_values_size;
}
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -2370,10 +2436,23 @@ class ByteStreamSplitDecoder<FLBAType> : public ByteStreamSplitDecoderBase<FLBAT
}
return num_decoded;
}

// DecodeRaw already unsplits the byte streams into a contiguous buffer, so
// decode straight into the caller's buffer with no intermediate scratch.
int Decode(uint8_t* buffer, int max_values) override {
return this->DecodeRaw(buffer, max_values);
}
};

} // namespace

// Default for the dense (densely packed) FLBA decode. Encodings override this;
// the base throws so an unimplemented encoding fails with a clear message.
int FLBADecoder::Decode(uint8_t* /*buffer*/, int /*max_values*/) {
throw ParquetException(
"Dense FIXED_LEN_BYTE_ARRAY decoding is not implemented for this encoding");
}

// ----------------------------------------------------------------------
// Factory functions

Expand Down Expand Up @@ -2475,7 +2554,7 @@ std::unique_ptr<Decoder> MakeDictDecoder(Type::type type_num,
case Type::BYTE_ARRAY:
return std::make_unique<DictByteArrayDecoderImpl>(descr, pool);
case Type::FIXED_LEN_BYTE_ARRAY:
return std::make_unique<DictDecoderImpl<FLBAType>>(descr, pool);
return std::make_unique<DictFLBADecoder>(descr, pool);
default:
break;
}
Expand Down
19 changes: 15 additions & 4 deletions cpp/src/parquet/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,23 @@ class BooleanDecoder : virtual public TypedDecoder<BooleanType> {

class FLBADecoder : virtual public TypedDecoder<FLBAType> {
public:
using TypedDecoder<FLBAType>::Decode;
using TypedDecoder<FLBAType>::DecodeSpaced;

// TODO(wesm): As possible follow-up to PARQUET-1508, we should examine if
// there is value in adding specialized read methods for
// FIXED_LEN_BYTE_ARRAY. If only Decimal data can occur with this data type
// then perhaps not
/// \brief Decode values into a densely packed buffer
///
/// Unlike Decode(FixedLenByteArray*, int), which writes one pointer per
/// value, this writes the raw fixed-width values back to back, with no
/// per-value pointers and no gaps.
///
/// \param[in] buffer destination for decoded values; caller owns it and
/// must size it to at least max_values * descr->type_length() bytes.
/// \param[in] max_values max values to decode.
/// \return The number of values decoded. Should be identical to max_values
/// except at the end of the current data page.
///
/// \note API EXPERIMENTAL
virtual int Decode(uint8_t* buffer, int max_values);
};

PARQUET_EXPORT
Expand Down
94 changes: 94 additions & 0 deletions cpp/src/parquet/encoding_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2660,4 +2660,98 @@ TEST(DeltaByteArrayEncodingAdHoc, ArrowDirectPut) {
}
}

// ----------------------------------------------------------------------
// Dense FIXED_LEN_BYTE_ARRAY decode tests
//
// FLBADecoder::Decode(uint8_t*, int) writes decoded values back to back into a
// densely packed buffer, with no per-value FixedLenByteArray pointers. Verify
// it for every encoding that overrides it: PLAIN, RLE_DICTIONARY,
// DELTA_BYTE_ARRAY and BYTE_STREAM_SPLIT.

class TestFLBADenseDecode : public ::testing::Test {
public:
void SetUp() override {
descr_ = ExampleDescr<FLBAType>();
type_length_ = descr_->type_length();
draws_.resize(kNumValues);
GenerateData<FLBA>(kNumValues, draws_.data(), &data_buffer_);
}

// Decode densely and compare each value against the original draw.
void CheckDenseDecode(FLBADecoder* decoder) {
ASSERT_NE(nullptr, decoder);
std::vector<uint8_t> dense(static_cast<size_t>(type_length_) * kNumValues);
int values_decoded = decoder->Decode(dense.data(), kNumValues);
ASSERT_EQ(kNumValues, values_decoded);
for (int i = 0; i < kNumValues; ++i) {
ASSERT_EQ(0, memcmp(dense.data() + static_cast<int64_t>(i) * type_length_,
draws_[i].ptr, type_length_))
<< "mismatch at value " << i;
}
}

protected:
static constexpr int kNumValues = 1000;
int type_length_;
std::vector<FLBA> draws_;
std::vector<uint8_t> data_buffer_;
std::shared_ptr<ColumnDescriptor> descr_;
};

TEST_F(TestFLBADenseDecode, Plain) {
auto encoder =
MakeTypedEncoder<FLBAType>(Encoding::PLAIN, /*use_dictionary=*/false, descr_.get());
encoder->Put(draws_.data(), kNumValues);
auto buffer = encoder->FlushValues();

auto decoder = MakeTypedDecoder<FLBAType>(Encoding::PLAIN, descr_.get());
decoder->SetData(kNumValues, buffer->data(), static_cast<int>(buffer->size()));
ASSERT_NO_FATAL_FAILURE(CheckDenseDecode(dynamic_cast<FLBADecoder*>(decoder.get())));
}

TEST_F(TestFLBADenseDecode, Dictionary) {
auto base_encoder = MakeEncoder(::parquet::Type::FIXED_LEN_BYTE_ARRAY, Encoding::PLAIN,
/*use_dictionary=*/true, descr_.get());
auto encoder = dynamic_cast<TypedEncoder<FLBAType>*>(base_encoder.get());
auto dict_traits = dynamic_cast<DictEncoder<FLBAType>*>(base_encoder.get());

encoder->Put(draws_.data(), kNumValues);
auto dict_buffer =
AllocateBuffer(default_memory_pool(), dict_traits->dict_encoded_size());
dict_traits->WriteDict(dict_buffer->mutable_data());
auto indices = encoder->FlushValues();

auto dict_decoder = MakeTypedDecoder<FLBAType>(Encoding::PLAIN, descr_.get());
dict_decoder->SetData(dict_traits->num_entries(), dict_buffer->data(),
static_cast<int>(dict_buffer->size()));

auto decoder = MakeDictDecoder<FLBAType>(descr_.get());
decoder->SetDict(dict_decoder.get());
decoder->SetData(kNumValues, indices->data(), static_cast<int>(indices->size()));
// dict_decoder must outlive the decode: the decoded bytes are owned by it.
ASSERT_NO_FATAL_FAILURE(CheckDenseDecode(dynamic_cast<FLBADecoder*>(decoder.get())));
}

TEST_F(TestFLBADenseDecode, DeltaByteArray) {
auto encoder = MakeTypedEncoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY,
/*use_dictionary=*/false, descr_.get());
encoder->Put(draws_.data(), kNumValues);
auto buffer = encoder->FlushValues();

auto decoder = MakeTypedDecoder<FLBAType>(Encoding::DELTA_BYTE_ARRAY, descr_.get());
decoder->SetData(kNumValues, buffer->data(), static_cast<int>(buffer->size()));
ASSERT_NO_FATAL_FAILURE(CheckDenseDecode(dynamic_cast<FLBADecoder*>(decoder.get())));
}

TEST_F(TestFLBADenseDecode, ByteStreamSplit) {
auto encoder = MakeTypedEncoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT,
/*use_dictionary=*/false, descr_.get());
encoder->Put(draws_.data(), kNumValues);
auto buffer = encoder->FlushValues();

auto decoder = MakeTypedDecoder<FLBAType>(Encoding::BYTE_STREAM_SPLIT, descr_.get());
decoder->SetData(kNumValues, buffer->data(), static_cast<int>(buffer->size()));
ASSERT_NO_FATAL_FAILURE(CheckDenseDecode(dynamic_cast<FLBADecoder*>(decoder.get())));
}

} // namespace parquet::test
Loading