Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ set(PAIMON_CORE_SRCS
core/io/data_file_meta_first_row_id_legacy_serializer.cpp
core/io/data_file_meta.cpp
core/io/data_file_meta_serializer.cpp
core/io/chain_data_file_path_factory.cpp
core/io/data_file_path_factory.cpp
core/io/append_data_file_writer_factory.cpp
core/io/blob_data_file_writer_factory.cpp
Expand Down Expand Up @@ -733,6 +734,7 @@ if(PAIMON_BUILD_TESTS)
core/table/source/table_read_test.cpp
core/table/source/append_count_reader_test.cpp
core/table/source/pk_count_reader_test.cpp
core/table/source/chain_data_split_test.cpp
core/table/source/data_split_test.cpp
core/table/source/deletion_file_test.cpp
core/table/source/split_generator_test.cpp
Expand Down
45 changes: 45 additions & 0 deletions src/paimon/core/io/chain_data_file_path_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/io/chain_data_file_path_factory.h"

#include <utility>

#include "paimon/common/utils/path_util.h"
#include "paimon/core/io/data_file_meta.h"

namespace paimon {

ChainDataFilePathFactory::ChainDataFilePathFactory(
std::shared_ptr<DataFilePathFactory> fallback,
std::unordered_map<std::string, std::string> file_bucket_path_mapping)
: fallback_(std::move(fallback)),
file_bucket_path_mapping_(std::move(file_bucket_path_mapping)) {}

std::string ChainDataFilePathFactory::ToPath(const std::shared_ptr<DataFileMeta>& file_meta) const {
if (file_meta->external_path) {
return file_meta->external_path.value();
}

auto it = file_bucket_path_mapping_.find(file_meta->file_name);
if (it != file_bucket_path_mapping_.end()) {
return PathUtil::JoinPath(it->second, file_meta->file_name);
}

return fallback_->ToPath(file_meta);
}

} // namespace paimon
39 changes: 39 additions & 0 deletions src/paimon/core/io/chain_data_file_path_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <string>
#include <unordered_map>

#include "paimon/core/io/data_file_path_factory.h"

namespace paimon {

class ChainDataFilePathFactory : public DataFilePathFactory {
public:
ChainDataFilePathFactory(std::shared_ptr<DataFilePathFactory> fallback,
std::unordered_map<std::string, std::string> file_bucket_path_mapping);

std::string ToPath(const std::shared_ptr<DataFileMeta>& file_meta) const override;

private:
std::shared_ptr<DataFilePathFactory> fallback_;
std::unordered_map<std::string, std::string> file_bucket_path_mapping_;
};

} // namespace paimon
2 changes: 1 addition & 1 deletion src/paimon/core/io/data_file_path_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class DataFilePathFactory : public PathFactory {
}

std::string ToPath(const std::string& file_name) const override;
std::string ToPath(const std::shared_ptr<DataFileMeta>& file_meta) const;
virtual std::string ToPath(const std::shared_ptr<DataFileMeta>& file_meta) const;

const std::string& GetUUID() const {
return uuid_;
Expand Down
17 changes: 17 additions & 0 deletions src/paimon/core/operation/abstract_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
#include "paimon/common/table/special_fields.h"
#include "paimon/common/types/data_field.h"
#include "paimon/common/utils/object_utils.h"
#include "paimon/core/io/chain_data_file_path_factory.h"
#include "paimon/core/io/complete_row_tracking_fields_reader.h"
#include "paimon/core/io/data_file_meta.h"
#include "paimon/core/io/data_file_path_factory.h"
#include "paimon/core/io/field_mapping_reader.h"
#include "paimon/core/operation/internal_read_context.h"
#include "paimon/core/partition/partition_info.h"
#include "paimon/core/schema/table_schema.h"
#include "paimon/core/table/source/chain_data_split_impl.h"
#include "paimon/core/table/source/data_split_impl.h"
#include "paimon/core/utils/field_mapping.h"
#include "paimon/core/utils/nested_projection_utils.h"
Expand Down Expand Up @@ -116,6 +118,21 @@ Result<std::unique_ptr<BatchReader>> AbstractSplitRead::ApplyPredicateFilterIfNe
return PredicateBatchReader::Create(std::move(reader), predicate, pool_);
}

Result<std::shared_ptr<DataFilePathFactory>> AbstractSplitRead::CreateDataFilePathFactory(
const std::shared_ptr<DataSplitImpl>& data_split) const {
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<DataFilePathFactory> base_factory,
path_factory_->CreateDataFilePathFactory(data_split->Partition(), data_split->Bucket()));

auto chain_split = std::dynamic_pointer_cast<ChainDataSplitImpl>(data_split);
if (!chain_split) {
return base_factory;
}

return std::make_shared<ChainDataFilePathFactory>(base_factory,
chain_split->FileBucketPathMapping());
}

Result<std::unique_ptr<ReaderBuilder>> AbstractSplitRead::PrepareReaderBuilder(
const std::string& format_identifier) const {
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> file_format,
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/operation/abstract_split_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class AbstractSplitRead : public SplitRead {
Result<std::unique_ptr<BatchReader>> ApplyPredicateFilterIfNeeded(
std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>& predicate) const;

Result<std::shared_ptr<DataFilePathFactory>> CreateDataFilePathFactory(
const std::shared_ptr<DataSplitImpl>& data_split) const;

protected:
// return nullptr if file is skipped by index or dv
virtual Result<std::unique_ptr<FileBatchReader>> ApplyIndexAndDvReaderIfNeeded(
Expand Down
9 changes: 4 additions & 5 deletions src/paimon/core/operation/data_evolution_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,13 @@ Result<std::unordered_set<BlobViewStruct>> DataEvolutionSplitRead::ExtractBlobVi
Result<std::unique_ptr<BatchReader>> DataEvolutionSplitRead::InnerCreateReader(
const std::shared_ptr<DataSplit>& data_split,
const std::optional<std::vector<Range>>& row_ranges) const {
auto split_impl = dynamic_cast<DataSplitImpl*>(data_split.get());
if (split_impl == nullptr) {
auto split_impl = std::dynamic_pointer_cast<DataSplitImpl>(data_split);
if (!split_impl) {
return Status::Invalid("unexpected error, split cast to impl failed");
}
assert(raw_read_schema_->num_fields() > 0);
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<DataFilePathFactory> data_file_path_factory,
path_factory_->CreateDataFilePathFactory(split_impl->Partition(), split_impl->Bucket()));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> data_file_path_factory,
CreateDataFilePathFactory(split_impl));
auto metas = split_impl->DataFiles();
PAIMON_ASSIGN_OR_RAISE(std::vector<std::vector<std::shared_ptr<DataFileMeta>>> split_by_row_id,
MergeRangesAndSort(std::move(metas)));
Expand Down
5 changes: 2 additions & 3 deletions src/paimon/core/operation/merge_file_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ Result<std::unique_ptr<BatchReader>> MergeFileSplitRead::CreateReader(
if (!data_split->BeforeFiles().empty()) {
return Status::Invalid("this read cannot accept split with before files.");
}
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<DataFilePathFactory> data_file_path_factory,
path_factory_->CreateDataFilePathFactory(data_split->Partition(), data_split->Bucket()));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> data_file_path_factory,
CreateDataFilePathFactory(data_split));
std::unique_ptr<BatchReader> batch_reader;
if (data_split->IsStreaming() || data_split->Bucket() == BucketModeDefine::POSTPONE_BUCKET) {
PAIMON_ASSIGN_OR_RAISE(
Expand Down
17 changes: 13 additions & 4 deletions src/paimon/core/operation/raw_file_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,26 @@ Result<std::unique_ptr<BatchReader>> RawFileSplitRead::CreateReader(
if (!data_split) {
return Status::Invalid("cannot cast split to data_split in RawFileSplitRead");
}
auto dv_factory = DeletionVector::CreateFactory(
options_.GetFileSystem(),
DeletionVector::CreateDeletionFileMap(data_split->DataFiles(), data_split->DeletionFiles()),
pool_);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> data_file_path_factory,
CreateDataFilePathFactory(data_split));
return CreateReader(data_split->Partition(), data_split->Bucket(), data_split->DataFiles(),
data_split->DeletionFiles());
dv_factory, data_file_path_factory);
}

Result<std::unique_ptr<BatchReader>> RawFileSplitRead::CreateReader(
const BinaryRow& partition, int32_t bucket,
const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
DeletionVector::Factory dv_factory) {
DeletionVector::Factory dv_factory,
std::shared_ptr<DataFilePathFactory> data_file_path_factory) {
const auto& predicate = context_->GetPredicate();
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> data_file_path_factory,
path_factory_->CreateDataFilePathFactory(partition, bucket));
if (!data_file_path_factory) {
PAIMON_ASSIGN_OR_RAISE(data_file_path_factory,
path_factory_->CreateDataFilePathFactory(partition, bucket));
}

PAIMON_ASSIGN_OR_RAISE(
std::vector<std::unique_ptr<FileBatchReader>> raw_file_readers,
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/core/operation/raw_file_split_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class RawFileSplitRead : public AbstractSplitRead {

Result<std::unique_ptr<BatchReader>> CreateReader(
const BinaryRow& partition, int32_t bucket,
const std::vector<std::shared_ptr<DataFileMeta>>& files,
DeletionVector::Factory dv_factory);
const std::vector<std::shared_ptr<DataFileMeta>>& files, DeletionVector::Factory dv_factory,
std::shared_ptr<DataFilePathFactory> data_file_path_factory = nullptr);

Result<bool> Match(const std::shared_ptr<Split>& split, bool force_keep_delete) const override;

Expand Down
70 changes: 70 additions & 0 deletions src/paimon/core/table/source/chain_data_split_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "paimon/common/data/binary_row.h"
#include "paimon/core/table/source/data_split_impl.h"

namespace paimon {

class ChainDataSplitImpl : public DataSplitImpl {
public:
static constexpr const char* VIRTUAL_BUCKET_PATH = "placeholder::virtual-bucket-path";

ChainDataSplitImpl(const std::shared_ptr<DataSplitImpl>& base_split, bool all_snapshot_split,
const BinaryRow& read_partition,
std::unordered_map<std::string, std::string>&& file_bucket_path_mapping,
std::unordered_map<std::string, std::string>&& file_branch_mapping)
: DataSplitImpl(base_split->Partition(), base_split->Bucket(), base_split->BucketPath(),
std::vector<std::shared_ptr<DataFileMeta>>(base_split->DataFiles())),
all_snapshot_split_(all_snapshot_split),
read_partition_(read_partition),
file_bucket_path_mapping_(std::move(file_bucket_path_mapping)),
file_branch_mapping_(std::move(file_branch_mapping)) {
CopyMetadataFrom(*base_split);
}

bool AllSnapshotSplit() const {
return all_snapshot_split_;
}

const BinaryRow& ReadPartition() const {
return read_partition_;
}

const std::unordered_map<std::string, std::string>& FileBucketPathMapping() const {
return file_bucket_path_mapping_;
}

const std::unordered_map<std::string, std::string>& FileBranchMapping() const {
return file_branch_mapping_;
}

private:
bool all_snapshot_split_;
BinaryRow read_partition_;
std::unordered_map<std::string, std::string> file_bucket_path_mapping_;
std::unordered_map<std::string, std::string> file_branch_mapping_;
};

} // namespace paimon
Loading
Loading