Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions cpp/src/arrow/compute/kernels/chunked_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ std::vector<int64_t> ChunkedIndexMapper::GetChunkLengths(
return chunk_lengths;
}

Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
ChunkedIndexMapper::LogicalToPhysical() {
Result<std::span<CompressedChunkLocation>> ChunkedIndexMapper::LogicalToPhysical() {
// Check that indices would fall in bounds for CompressedChunkLocation
if (ARROW_PREDICT_FALSE(chunk_lengths_.size() >
CompressedChunkLocation::kMaxChunkIndex + 1)) {
Expand All @@ -67,13 +66,13 @@ ChunkedIndexMapper::LogicalToPhysical() {
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
const int64_t num_indices = static_cast<int64_t>(indices_.size());
DCHECK_EQ(num_indices, std::accumulate(chunk_lengths_.begin(), chunk_lengths_.end(),
static_cast<int64_t>(0)));
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
DCHECK_EQ(physical_begin + num_indices,
reinterpret_cast<CompressedChunkLocation*>(indices_end_));
reinterpret_cast<CompressedChunkLocation*>(indices_.data());
DCHECK_EQ(physical_begin + num_indices, reinterpret_cast<CompressedChunkLocation*>(
indices_.data() + indices_.size()));

int64_t chunk_offset = 0;
for (int64_t chunk_index = 0; chunk_index < static_cast<int64_t>(chunk_lengths_.size());
Expand All @@ -82,17 +81,17 @@ ChunkedIndexMapper::LogicalToPhysical() {
for (int64_t i = 0; i < chunk_length; ++i) {
// Logical indices are expected to be chunk-partitioned, which avoids costly
// chunked index resolution.
DCHECK_GE(indices_begin_[chunk_offset + i], static_cast<uint64_t>(chunk_offset));
DCHECK_LT(indices_begin_[chunk_offset + i],
DCHECK_GE(indices_[chunk_offset + i], static_cast<uint64_t>(chunk_offset));
DCHECK_LT(indices_[chunk_offset + i],
static_cast<uint64_t>(chunk_offset + chunk_length));
physical_begin[chunk_offset + i] = CompressedChunkLocation{
static_cast<uint64_t>(chunk_index),
indices_begin_[chunk_offset + i] - static_cast<uint64_t>(chunk_offset)};
indices_[chunk_offset + i] - static_cast<uint64_t>(chunk_offset)};
}
chunk_offset += chunk_length;
}

return std::pair{physical_begin, physical_begin + num_indices};
return std::span<CompressedChunkLocation>{physical_begin, physical_begin + num_indices};
}

Status ChunkedIndexMapper::PhysicalToLogical() {
Expand All @@ -105,15 +104,15 @@ Status ChunkedIndexMapper::PhysicalToLogical() {
}
}

const int64_t num_indices = static_cast<int64_t>(indices_end_ - indices_begin_);
const int64_t num_indices = static_cast<int64_t>(indices_.size());
CompressedChunkLocation* physical_begin =
reinterpret_cast<CompressedChunkLocation*>(indices_begin_);
reinterpret_cast<CompressedChunkLocation*>(indices_.data());
for (int64_t i = 0; i < num_indices; ++i) {
const auto loc = physical_begin[i];
DCHECK_LT(loc.chunk_index(), chunk_offsets.size());
DCHECK_LT(loc.index_in_chunk(),
static_cast<uint64_t>(chunk_lengths_[loc.chunk_index()]));
indices_begin_[i] =
indices_[i] =
chunk_offsets[loc.chunk_index()] + static_cast<int64_t>(loc.index_in_chunk());
}

Expand Down
25 changes: 8 additions & 17 deletions cpp/src/arrow/compute/kernels/chunked_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,27 +127,19 @@ ARROW_EXPORT std::vector<const Array*> GetArrayPointers(const ArrayVector& array
// and vice-versa.
class ARROW_EXPORT ChunkedIndexMapper {
public:
ChunkedIndexMapper(const std::vector<const Array*>& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: ChunkedIndexMapper(std::span(chunks), indices_begin, indices_end) {}
ChunkedIndexMapper(std::span<const Array* const> chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
ChunkedIndexMapper(const RecordBatchVector& chunks, uint64_t* indices_begin,
uint64_t* indices_end)
: chunk_lengths_(GetChunkLengths(chunks)),
indices_begin_(indices_begin),
indices_end_(indices_end) {}
ChunkedIndexMapper(const std::vector<const Array*>& chunks, std::span<uint64_t> indices)
: ChunkedIndexMapper(std::span(chunks), indices) {}
ChunkedIndexMapper(std::span<const Array* const> chunks, std::span<uint64_t> indices)
: chunk_lengths_(GetChunkLengths(chunks)), indices_(indices) {}
ChunkedIndexMapper(const RecordBatchVector& chunks, std::span<uint64_t> indices)
: chunk_lengths_(GetChunkLengths(chunks)), indices_(indices) {}

// Turn the original uint64_t logical indices into physical. This reuses the
// same memory area, so the logical indices cannot be used anymore until
// PhysicalToLogical() is called.
//
// This assumes that the logical indices are originally chunk-partitioned.
Result<std::pair<CompressedChunkLocation*, CompressedChunkLocation*>>
LogicalToPhysical();
Result<std::span<CompressedChunkLocation>> LogicalToPhysical();

// Turn the physical indices back into logical, making the uint64_t indices
// usable again.
Expand All @@ -158,8 +150,7 @@ class ARROW_EXPORT ChunkedIndexMapper {
static std::vector<int64_t> GetChunkLengths(const RecordBatchVector& chunks);

std::vector<int64_t> chunk_lengths_;
uint64_t* indices_begin_;
uint64_t* indices_end_;
std::span<uint64_t> indices_;
};

} // namespace arrow::compute::internal
Loading
Loading