From 1bfdf4382807005ce75c1bf591101b117189d992 Mon Sep 17 00:00:00 2001 From: gripleaf <425797155@qq.com> Date: Mon, 29 Jun 2026 14:17:57 +0800 Subject: [PATCH] feat(manifest): support snapshot mainifest cache --- docs/source/user_guide.rst | 1 + .../user_guide/manifest_entry_cache.rst | 84 ++++++++ include/paimon/cache/cache.h | 3 + include/paimon/defs.h | 5 + src/paimon/CMakeLists.txt | 1 + src/paimon/common/defs.cpp | 2 + src/paimon/common/io/cache/cache_key.cpp | 14 ++ src/paimon/common/io/cache/lru_cache_test.cpp | 12 ++ src/paimon/core/core_options.cpp | 13 ++ src/paimon/core/core_options.h | 1 + src/paimon/core/core_options_test.cpp | 6 + .../snapshot_live_manifest_entries.cpp | 142 +++++++++++++ .../manifest/snapshot_live_manifest_entries.h | 65 ++++++ src/paimon/core/operation/file_store_scan.cpp | 196 +++++++++++++++--- src/paimon/core/operation/file_store_scan.h | 36 +++- .../core/operation/file_store_scan_test.cpp | 84 ++++++++ .../core/operation/metrics/scan_metrics.h | 5 + src/paimon/core/table/source/table_scan.cpp | 41 +++- 18 files changed, 675 insertions(+), 36 deletions(-) create mode 100644 docs/source/user_guide/manifest_entry_cache.rst create mode 100644 src/paimon/core/manifest/snapshot_live_manifest_entries.cpp create mode 100644 src/paimon/core/manifest/snapshot_live_manifest_entries.h diff --git a/docs/source/user_guide.rst b/docs/source/user_guide.rst index 4c497b5c3..e9d109cea 100644 --- a/docs/source/user_guide.rst +++ b/docs/source/user_guide.rst @@ -25,6 +25,7 @@ User Guide user_guide/snapshot user_guide/manifest user_guide/manifest_cache + user_guide/manifest_entry_cache user_guide/parquet_metadata_cache user_guide/data_types user_guide/primary_key_table diff --git a/docs/source/user_guide/manifest_entry_cache.rst b/docs/source/user_guide/manifest_entry_cache.rst new file mode 100644 index 000000000..2885e840b --- /dev/null +++ b/docs/source/user_guide/manifest_entry_cache.rst @@ -0,0 +1,84 @@ +.. Copyright 2026-present Alibaba Inc. + +.. Licensed under the Apache License, Version 2.0 (the "License"); +.. you may not use this file except in compliance with the License. +.. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +Manifest Entry Cache +==================== + +Overview +-------- + +Large tables may contain many manifest entries, while a scan may only need a +small subset after snapshot, partition, bucket, and statistics pruning. The +snapshot-level manifest entry cache reduces repeated manifest decoding cost for +successive full scans. + +The cache stores decoded and merged live manifest entries by snapshot for +``ScanMode::ALL``. When a newer snapshot is scanned, paimon-cpp tries to build +the target snapshot incrementally from the latest cached snapshot by reading +only intermediate delta manifests. + +Request-specific filters are not stored in the cache. Partition, bucket, level, +and predicate filters are still evaluated for each scan, so cached entries can +be reused safely across different scan predicates. + +Configuration +------------- + +Manifest entry caching reuses the cache instance provided by +``ScanContextBuilder::WithCache()`` and stores the snapshot bundle under +``CacheKind::SNAPSHOT_LIVE_MANIFEST``: + +.. code-block:: cpp + + auto cache = std::make_shared(128 * 1024 * 1024); + ScanContextBuilder context_builder(table_path); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr scan_context, + context_builder + .WithCache(cache) + .AddOption(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "3") + .Finish()); + +Cache entries are scoped by table path and branch, so they can be reused across +newly created ``TableScan`` and ``FileStoreScan`` instances as long as they +share the same cache object. + +``Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS`` controls how many snapshot +results are retained per table and branch. Older snapshot entries are evicted +first. The default value is ``0``, which disables the cache path. Set it to a +positive value to enable the cache when ``ScanContextBuilder::WithCache()`` is +also configured. + +If no cache is provided through ``ScanContextBuilder::WithCache()``, this +optimization is skipped. The snapshot manifest entry cache shares the same +``Cache`` interface with raw manifest and data-file footer caches, but it uses a +dedicated ``CacheKind`` and a table/branch key instead of file byte ranges. + +Limitations +----------- + +The cache is currently used only for ``ScanMode::ALL``. It is skipped for +row-range scans because row-range pruning is applied at manifest-meta level. + +Metrics +------- + +The scan metrics expose counters for the last scan: + +- ``lastManifestEntryCacheHit``: whether the target snapshot was served + directly from the cache. +- ``lastManifestEntryCacheIncrementalSnapshots``: how many intermediate + snapshots were applied during incremental construction. +- ``lastManifestEntryCacheLoadedManifests``: how many manifest files were + loaded for the cache path. diff --git a/include/paimon/cache/cache.h b/include/paimon/cache/cache.h index 89ed69047..6f0eabcb6 100644 --- a/include/paimon/cache/cache.h +++ b/include/paimon/cache/cache.h @@ -33,6 +33,7 @@ enum class CacheKind { DEFAULT, MANIFEST, DATA_FILE_FOOTER, + SNAPSHOT_LIVE_MANIFEST, }; class PAIMON_EXPORT CacheKey { @@ -41,6 +42,8 @@ class PAIMON_EXPORT CacheKey { int32_t length, bool is_index); static std::shared_ptr ForKind(const std::string& file_path, int64_t position, int32_t length, CacheKind kind); + static std::shared_ptr ForSnapshotLiveManifestEntries(const std::string& table_path, + const std::string& branch); public: virtual ~CacheKey() = default; diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 5c661c856..3a5678af4 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -166,6 +166,11 @@ struct PAIMON_EXPORT Options { /// "latest-full", "latest", "from-snapshot", "from-snapshot-full". Default value is "default". static const char SCAN_MODE[]; + /// "scan.manifest-entry-cache.max-snapshots" - Maximum number of snapshot manifest entry + /// results retained per table and branch. Setting it to 0 disables manifest entry cache. + /// Default value is 0. + static const char SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS[]; + /// "read.batch-size" - Read batch size for any file format if it supports. /// The default value is 1024. static const char READ_BATCH_SIZE[]; diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 3f99662aa..ff2482cc7 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -304,6 +304,7 @@ set(PAIMON_CORE_SRCS core/operation/raw_file_split_read.cpp core/operation/read_context.cpp core/operation/scan_context.cpp + core/manifest/snapshot_live_manifest_entries.cpp core/operation/write_context.cpp core/operation/write_restore.cpp core/postpone/postpone_bucket_writer.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 9d32b5d9b..e12ffa690 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -47,6 +47,8 @@ const char Options::SOURCE_SPLIT_TARGET_SIZE[] = "source.split.target-size"; const char Options::SOURCE_SPLIT_OPEN_FILE_COST[] = "source.split.open-file-cost"; const char Options::SCAN_SNAPSHOT_ID[] = "scan.snapshot-id"; const char Options::SCAN_MODE[] = "scan.mode"; +const char Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS[] = + "scan.manifest-entry-cache.max-snapshots"; const char Options::READ_BATCH_SIZE[] = "read.batch-size"; const char Options::WRITE_BATCH_SIZE[] = "write.batch-size"; const char Options::WRITE_BUFFER_SIZE[] = "write-buffer-size"; diff --git a/src/paimon/common/io/cache/cache_key.cpp b/src/paimon/common/io/cache/cache_key.cpp index 12732bb27..680e849ca 100644 --- a/src/paimon/common/io/cache/cache_key.cpp +++ b/src/paimon/common/io/cache/cache_key.cpp @@ -17,6 +17,14 @@ #include "paimon/common/io/cache/cache_key.h" namespace paimon { +namespace { + +std::string SnapshotLiveManifestEntriesCacheKey(const std::string& table_path, + const std::string& branch) { + return table_path + "#" + branch; +} + +} // namespace std::shared_ptr CacheKey::ForPosition(const std::string& file_path, int64_t position, int32_t length, bool is_index) { @@ -31,6 +39,12 @@ std::shared_ptr CacheKey::ForKind(const std::string& file_path, int64_ return key; } +std::shared_ptr CacheKey::ForSnapshotLiveManifestEntries(const std::string& table_path, + const std::string& branch) { + return ForKind(SnapshotLiveManifestEntriesCacheKey(table_path, branch), /*position=*/-1, + /*length=*/-1, CacheKind::SNAPSHOT_LIVE_MANIFEST); +} + bool PositionCacheKey::IsIndex() const { return is_index_; } diff --git a/src/paimon/common/io/cache/lru_cache_test.cpp b/src/paimon/common/io/cache/lru_cache_test.cpp index 3864c8224..8209bca52 100644 --- a/src/paimon/common/io/cache/lru_cache_test.cpp +++ b/src/paimon/common/io/cache/lru_cache_test.cpp @@ -381,6 +381,18 @@ TEST_F(LruCacheTest, TestForKindSetsKeyKind) { ASSERT_EQ(CacheKind::MANIFEST, put_key->GetKind()); } +TEST_F(LruCacheTest, TestForSnapshotLiveManifestEntries) { + auto main_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main"); + auto same_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "main"); + auto branch_key = CacheKey::ForSnapshotLiveManifestEntries("table_path", "dev"); + auto table_key = CacheKey::ForSnapshotLiveManifestEntries("other_table_path", "main"); + + ASSERT_EQ(CacheKind::SNAPSHOT_LIVE_MANIFEST, main_key->GetKind()); + ASSERT_TRUE(CacheKeyEqual()(main_key, same_key)); + ASSERT_FALSE(CacheKeyEqual()(main_key, branch_key)); + ASSERT_FALSE(CacheKeyEqual()(main_key, table_key)); +} + /// Verifies that multiple evictions happen when a single large entry is inserted. TEST_F(LruCacheTest, TestMultipleEvictions) { LruCache cache(300); diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 6e56c475b..f8db1f999 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -403,6 +403,7 @@ struct CoreOptions::Impl { int32_t bucket = -1; int32_t manifest_merge_min_count = 30; + int32_t scan_manifest_entry_cache_max_snapshots = 0; int32_t read_batch_size = 1024; int32_t write_batch_size = 1024; int32_t local_sort_max_num_file_handles = 128; @@ -717,6 +718,13 @@ struct CoreOptions::Impl { } // Parse scan.mode - scanning behavior of the source, default "default" PAIMON_RETURN_NOT_OK(parser.ParseStartupMode(&startup_mode)); + // Parse scan.manifest-entry-cache.max-snapshots - cache size by snapshot count + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, + &scan_manifest_entry_cache_max_snapshots)); + if (scan_manifest_entry_cache_max_snapshots < 0) { + return Status::Invalid(fmt::format("{} must be non-negative", + Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS)); + } // Parse scan.fallback-branch - fallback branch when partition not found PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_FALLBACK_BRANCH, &scan_fallback_branch)); // Parse branch - branch name, default "main" @@ -968,6 +976,11 @@ std::optional CoreOptions::GetScanSnapshotId() const { std::optional CoreOptions::GetScanTimestampMillis() const { return impl_->scan_timestamp_millis; } + +int32_t CoreOptions::GetScanManifestEntryCacheMaxSnapshots() const { + return impl_->scan_manifest_entry_cache_max_snapshots; +} + int64_t CoreOptions::GetManifestTargetFileSize() const { return impl_->manifest_target_file_size; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 21235790b..fb64eff02 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -78,6 +78,7 @@ class PAIMON_EXPORT CoreOptions { int64_t GetSourceSplitOpenFileCost() const; std::optional GetScanSnapshotId() const; std::optional GetScanTimestampMillis() const; + int32_t GetScanManifestEntryCacheMaxSnapshots() const; int64_t GetManifestTargetFileSize() const; std::shared_ptr GetCache() const; diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 23e59b9cf..cb1924593 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -55,6 +55,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(8 * 1024 * 1024L, core_options.GetManifestTargetFileSize()); ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize()); ASSERT_EQ(30, core_options.GetManifestMergeMinCount()); + ASSERT_EQ(0, core_options.GetScanManifestEntryCacheMaxSnapshots()); ASSERT_EQ(nullptr, core_options.GetCache()); ASSERT_EQ(128 * 1024 * 1024L, core_options.GetSourceSplitTargetSize()); ASSERT_EQ(4 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost()); @@ -184,6 +185,7 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::COMMIT_MAX_RETRIES, "20"}, {Options::SCAN_SNAPSHOT_ID, "5"}, {Options::SCAN_MODE, "from-snapshot-full"}, + {Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "7"}, {Options::SNAPSHOT_NUM_RETAINED_MIN, "15"}, {Options::SNAPSHOT_NUM_RETAINED_MAX, "30"}, {Options::SNAPSHOT_EXPIRE_LIMIT, "20"}, @@ -305,6 +307,7 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(120 * 1000, core_options.GetCommitTimeout()); ASSERT_EQ(20, core_options.GetCommitMaxRetries()); ASSERT_EQ(5, core_options.GetScanSnapshotId().value_or(-1)); + ASSERT_EQ(7, core_options.GetScanManifestEntryCacheMaxSnapshots()); ExpireConfig expire_config = core_options.GetExpireConfig(); ASSERT_EQ(15, expire_config.GetSnapshotRetainMin()); ASSERT_EQ(30, expire_config.GetSnapshotRetainMax()); @@ -432,6 +435,9 @@ TEST(CoreOptionsTest, TestInvalidCase) { "invalid lookup mode: invalid"); ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::LOOKUP_COMPACT_MAX_INTERVAL, "invalid"}}), "Invalid Config [lookup-compact.max-interval: invalid]"); + ASSERT_NOK_WITH_MSG( + CoreOptions::FromMap({{Options::SCAN_MANIFEST_ENTRY_CACHE_MAX_SNAPSHOTS, "-1"}}), + "scan.manifest-entry-cache.max-snapshots must be non-negative"); ASSERT_NOK_WITH_MSG( CoreOptions::FromMap({{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "1.1"}}), "The high priority pool ratio should in the range [0, 1), while input is 1.1"); diff --git a/src/paimon/core/manifest/snapshot_live_manifest_entries.cpp b/src/paimon/core/manifest/snapshot_live_manifest_entries.cpp new file mode 100644 index 000000000..72c3bd8b5 --- /dev/null +++ b/src/paimon/core/manifest/snapshot_live_manifest_entries.cpp @@ -0,0 +1,142 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/core/manifest/manifest_entry_serializer.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/memory/memory_segment.h" + +namespace paimon { +namespace { + +constexpr int32_t kMagic = 0x534d4543; // SMEC +constexpr int32_t kVersion = 1; + +size_t NormalizeMaxSnapshots(int32_t max_snapshots) { + return static_cast(std::max(0, max_snapshots)); +} + +std::shared_ptr ToBytes(const MemorySegmentOutputStream& out, + const std::shared_ptr& pool) { + auto bytes = Bytes::AllocateBytes(static_cast(out.CurrentSize()), pool.get()); + int64_t offset = 0; + for (const auto& segment : out.Segments()) { + int64_t copy_size = + std::min(segment.Size(), static_cast(bytes->size()) - offset); + if (copy_size <= 0) { + break; + } + std::memcpy(bytes->data() + offset, segment.Data(), static_cast(copy_size)); + offset += copy_size; + } + return bytes; +} + +} // namespace + +SnapshotLiveManifestEntries::SnapshotLiveManifestEntries(int32_t max_snapshots) + : max_snapshots_(max_snapshots) {} + +std::optional SnapshotLiveManifestEntries::LatestBeforeOrEqual( + int64_t snapshot_id) const { + auto iter = entries_by_snapshot_.upper_bound(snapshot_id); + if (iter == entries_by_snapshot_.begin()) { + return std::optional(); + } + --iter; + return Entry{iter->first, iter->second}; +} + +void SnapshotLiveManifestEntries::Put(int64_t snapshot_id, std::vector&& entries) { + if (NormalizeMaxSnapshots(max_snapshots_) == 0) { + return; + } + entries_by_snapshot_[snapshot_id] = std::move(entries); + EvictIfNeeded(); +} + +size_t SnapshotLiveManifestEntries::Size() const { + return entries_by_snapshot_.size(); +} + +Result> SnapshotLiveManifestEntries::Serialize( + const std::shared_ptr& pool) const { + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + out.WriteValue(kMagic); + out.WriteValue(kVersion); + out.WriteValue(static_cast(entries_by_snapshot_.size())); + + ManifestEntrySerializer serializer(pool); + for (const auto& [snapshot_id, entries] : entries_by_snapshot_) { + out.WriteValue(snapshot_id); + PAIMON_RETURN_NOT_OK(serializer.SerializeList(entries, &out)); + } + return ToBytes(out, pool); +} + +Result SnapshotLiveManifestEntries::Deserialize( + const MemorySegment& segment, int32_t max_snapshots, const std::shared_ptr& pool) { + SnapshotLiveManifestEntries snapshot_live_manifest_entries(max_snapshots); + if (segment.Data() == nullptr || segment.Size() == 0) { + return snapshot_live_manifest_entries; + } + + auto bytes = segment.GetOrCreateHeapMemory(pool.get()); + auto input_stream = std::make_shared(bytes->data(), bytes->size()); + DataInputStream in(input_stream); + + PAIMON_ASSIGN_OR_RAISE(int32_t magic, in.ReadValue()); + if (magic != kMagic) { + return Status::Invalid("invalid snapshot live manifest entries magic"); + } + PAIMON_ASSIGN_OR_RAISE(int32_t version, in.ReadValue()); + if (version != kVersion) { + return Status::Invalid( + fmt::format("unsupported snapshot live manifest entries version {}", version)); + } + PAIMON_ASSIGN_OR_RAISE(int32_t snapshot_count, in.ReadValue()); + if (snapshot_count < 0) { + return Status::Invalid("snapshot live manifest entries snapshot count is negative"); + } + + ManifestEntrySerializer serializer(pool); + for (int32_t i = 0; i < snapshot_count; i++) { + PAIMON_ASSIGN_OR_RAISE(int64_t snapshot_id, in.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(std::vector entries, serializer.DeserializeList(&in)); + snapshot_live_manifest_entries.entries_by_snapshot_[snapshot_id] = std::move(entries); + } + snapshot_live_manifest_entries.EvictIfNeeded(); + return snapshot_live_manifest_entries; +} + +void SnapshotLiveManifestEntries::EvictIfNeeded() { + size_t max_snapshots = NormalizeMaxSnapshots(max_snapshots_); + while (entries_by_snapshot_.size() > max_snapshots) { + entries_by_snapshot_.erase(entries_by_snapshot_.begin()); + } +} + +} // namespace paimon diff --git a/src/paimon/core/manifest/snapshot_live_manifest_entries.h b/src/paimon/core/manifest/snapshot_live_manifest_entries.h new file mode 100644 index 000000000..482d0911e --- /dev/null +++ b/src/paimon/core/manifest/snapshot_live_manifest_entries.h @@ -0,0 +1,65 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +class Bytes; +class MemoryPool; +class MemorySegment; + +/// Live manifest entries retained for multiple snapshots. +/// +/// This value object owns merged live manifest entries by snapshot id. It does not own or access a +/// cache; callers are responsible for storing the serialized bytes in the cache layer. +class SnapshotLiveManifestEntries { + public: + struct Entry { + int64_t snapshot_id; + std::vector entries; + }; + + explicit SnapshotLiveManifestEntries(int32_t max_snapshots); + + std::optional LatestBeforeOrEqual(int64_t snapshot_id) const; + void Put(int64_t snapshot_id, std::vector&& entries); + + size_t Size() const; + + Result> Serialize(const std::shared_ptr& pool) const; + static Result Deserialize(const MemorySegment& segment, + int32_t max_snapshots, + const std::shared_ptr& pool); + + private: + void EvictIfNeeded(); + + std::map> entries_by_snapshot_; + int32_t max_snapshots_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/file_store_scan.cpp b/src/paimon/core/operation/file_store_scan.cpp index 876fa6276..1fe87f074 100644 --- a/src/paimon/core/operation/file_store_scan.cpp +++ b/src/paimon/core/operation/file_store_scan.cpp @@ -38,6 +38,7 @@ #include "paimon/core/manifest/manifest_file.h" #include "paimon/core/manifest/manifest_file_meta.h" #include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" #include "paimon/core/operation/metrics/scan_metrics.h" #include "paimon/core/partition/partition_info.h" #include "paimon/core/stats/simple_stats.h" @@ -45,6 +46,8 @@ #include "paimon/core/utils/duration.h" #include "paimon/core/utils/field_mapping.h" #include "paimon/core/utils/snapshot_manager.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_segment.h" #include "paimon/predicate/literal.h" #include "paimon/predicate/predicate.h" #include "paimon/predicate/predicate_builder.h" @@ -134,7 +137,27 @@ Result> FileStoreScan::CreatePlan() cons ReadManifests(&snapshot, &all_manifest_file_metas, &filtered_manifest_file_metas)); std::vector manifest_entries; - PAIMON_RETURN_NOT_OK(ReadManifestEntries(filtered_manifest_file_metas, &manifest_entries)); + LiveManifestCacheReadStats cache_stats; + const bool use_snapshot_live_manifest_cache = + snapshot.has_value() && scan_mode_ == ScanMode::ALL && + core_options_.GetScanManifestEntryCacheMaxSnapshots() > 0 && + core_options_.GetCache() != nullptr && snapshot_live_manifest_cache_key_ != nullptr && + !row_range_index_.has_value(); + if (use_snapshot_live_manifest_cache) { + PAIMON_RETURN_NOT_OK(ReadManifestEntriesWithCache(snapshot.value(), all_manifest_file_metas, + &manifest_entries, &cache_stats)); + std::vector filtered_entries; + filtered_entries.reserve(manifest_entries.size()); + for (auto& entry : manifest_entries) { + PAIMON_ASSIGN_OR_RAISE(bool keep, FilterManifestEntry(entry)); + if (keep) { + filtered_entries.emplace_back(std::move(entry)); + } + } + manifest_entries = std::move(filtered_entries); + } else { + PAIMON_RETURN_NOT_OK(ReadManifestEntries(filtered_manifest_file_metas, &manifest_entries)); + } PAIMON_ASSIGN_OR_RAISE(manifest_entries, PostFilterManifestEntries(std::move(manifest_entries))); @@ -170,6 +193,11 @@ Result> FileStoreScan::CreatePlan() cons metrics_->SetCounter(ScanMetrics::LAST_SCANNED_SNAPSHOT_ID, snapshot.has_value() ? snapshot.value().Id() : int64_t{0}); metrics_->SetCounter(ScanMetrics::LAST_SCANNED_MANIFESTS, filtered_manifest_file_metas.size()); + metrics_->SetCounter(ScanMetrics::LAST_MANIFEST_ENTRY_CACHE_HIT, cache_stats.hit ? 1 : 0); + metrics_->SetCounter(ScanMetrics::LAST_MANIFEST_ENTRY_CACHE_INCREMENTAL_SNAPSHOTS, + cache_stats.incremental_snapshots); + metrics_->SetCounter(ScanMetrics::LAST_MANIFEST_ENTRY_CACHE_LOADED_MANIFESTS, + cache_stats.loaded_manifests); metrics_->SetCounter( ScanMetrics::LAST_SCAN_SKIPPED_TABLE_FILES, std::max(int64_t{0}, all_data_files - static_cast(manifest_entries.size()))); @@ -243,6 +271,32 @@ Status FileStoreScan::ReadFileEntries(const std::vector& manif return Status::OK(); } +Status FileStoreScan::ReadFileEntriesWithoutScanFilter( + const std::vector& manifest_metas, + std::vector* manifest_entries) const { + std::vector>>> futures; + for (const auto& meta : manifest_metas) { + auto read_meta_task = [this, meta]() -> Result> { + std::vector tmp_entries; + PAIMON_RETURN_NOT_OK(ReadManifestFileMetaWithoutScanFilter(meta, &tmp_entries)); + return tmp_entries; + }; + futures.push_back(Via(executor_.get(), read_meta_task)); + } + + auto unfiltered_entries = CollectAll(futures); + for (auto& entry_list : unfiltered_entries) { + if (!entry_list.ok()) { + return entry_list.status(); + } + manifest_entries->reserve(manifest_entries->size() + entry_list.value().size()); + for (auto& entry : entry_list.value()) { + manifest_entries->emplace_back(std::move(entry)); + } + } + return Status::OK(); +} + Status FileStoreScan::ReadManifestEntries(const std::vector& manifest_metas, std::vector* manifest_entries) const { if (scan_mode_ == ScanMode::ALL) { @@ -251,6 +305,92 @@ Status FileStoreScan::ReadManifestEntries(const std::vector& m return ReadAndNoMergeFileEntries(manifest_metas, manifest_entries); } +Status FileStoreScan::ReadManifestEntriesWithCache( + const Snapshot& snapshot, const std::vector& all_manifest_metas, + std::vector* manifest_entries, LiveManifestCacheReadStats* stats) const { + PAIMON_ASSIGN_OR_RAISE(SnapshotLiveManifestEntries cached_entries, + LoadSnapshotLiveManifestEntries()); + std::optional cached = + cached_entries.LatestBeforeOrEqual(snapshot.Id()); + if (cached && cached->snapshot_id == snapshot.Id()) { + stats->hit = true; + *manifest_entries = std::move(cached->entries); + return Status::OK(); + } + + if (cached) { + std::vector entries = std::move(cached->entries); + bool incremental_succeeded = true; + for (int64_t id = cached->snapshot_id + 1; id <= snapshot.Id(); id++) { + PAIMON_ASSIGN_OR_RAISE(bool exists, snapshot_manager_->SnapshotExists(id)); + if (!exists) { + incremental_succeeded = false; + break; + } + PAIMON_ASSIGN_OR_RAISE(Snapshot incremental_snapshot, + snapshot_manager_->LoadSnapshot(id)); + std::vector delta_metas; + PAIMON_RETURN_NOT_OK( + manifest_list_->ReadDeltaManifests(incremental_snapshot, &delta_metas)); + std::vector delta_entries; + PAIMON_RETURN_NOT_OK(ReadFileEntriesWithoutScanFilter(delta_metas, &delta_entries)); + entries.reserve(entries.size() + delta_entries.size()); + for (auto& entry : delta_entries) { + entries.emplace_back(std::move(entry)); + } + std::vector merged_entries; + PAIMON_RETURN_NOT_OK(FileEntry::MergeEntries(entries, &merged_entries)); + entries = std::move(merged_entries); + stats->incremental_snapshots++; + stats->loaded_manifests += static_cast(delta_metas.size()); + } + if (incremental_succeeded) { + *manifest_entries = entries; + cached_entries.Put(snapshot.Id(), std::move(entries)); + PAIMON_RETURN_NOT_OK(StoreSnapshotLiveManifestEntries(cached_entries)); + return Status::OK(); + } + } + + *stats = LiveManifestCacheReadStats(); + PAIMON_RETURN_NOT_OK( + ReadAndMergeFileEntriesWithoutScanFilter(all_manifest_metas, manifest_entries)); + stats->loaded_manifests = static_cast(all_manifest_metas.size()); + std::vector cache_entries = *manifest_entries; + cached_entries.Put(snapshot.Id(), std::move(cache_entries)); + PAIMON_RETURN_NOT_OK(StoreSnapshotLiveManifestEntries(cached_entries)); + return Status::OK(); +} + +Result FileStoreScan::LoadSnapshotLiveManifestEntries() const { + auto supplier = [](const std::shared_ptr&) -> Result> { + return std::shared_ptr(); + }; + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr cache_value, + core_options_.GetCache()->Get(snapshot_live_manifest_cache_key_, supplier)); + if (!cache_value) { + return SnapshotLiveManifestEntries(core_options_.GetScanManifestEntryCacheMaxSnapshots()); + } + return SnapshotLiveManifestEntries::Deserialize( + cache_value->GetSegment(), core_options_.GetScanManifestEntryCacheMaxSnapshots(), pool_); +} + +Status FileStoreScan::StoreSnapshotLiveManifestEntries( + const SnapshotLiveManifestEntries& entries) const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr bytes, entries.Serialize(pool_)); + auto cache_value = std::make_shared(MemorySegment::Wrap(bytes), CacheCallback()); + return core_options_.GetCache()->Put(snapshot_live_manifest_cache_key_, cache_value); +} + +Status FileStoreScan::ReadAndMergeFileEntriesWithoutScanFilter( + const std::vector& manifest_metas, + std::vector* merged_entries) const { + std::vector unmerged_entries; + PAIMON_RETURN_NOT_OK(ReadFileEntriesWithoutScanFilter(manifest_metas, &unmerged_entries)); + return FileEntry::MergeEntries(unmerged_entries, merged_entries); +} + Status FileStoreScan::ReadAndMergeFileEntries(const std::vector& manifest_metas, std::vector* merged_entries) const { std::vector unmerged_entries; @@ -319,37 +459,43 @@ bool FileStoreScan::FilterManifestByRowRanges(const ManifestFileMeta& manifest) Status FileStoreScan::ReadManifestFileMeta(const ManifestFileMeta& manifest, std::vector* entries) const { - auto filter = [&](const ManifestEntry& entry) -> Result { - if (partition_filter_) { - PAIMON_ASSIGN_OR_RAISE(bool res, - partition_filter_->Test(partition_schema_, entry.Partition())); - if (!res) { - return false; - } - } - if (only_read_real_buckets_ && entry.Bucket() < 0) { - return false; - } - if (bucket_filter_ != std::nullopt && entry.Bucket() != bucket_filter_.value()) { - return false; - } - if (level_filter_ != nullptr && !level_filter_(entry.Level())) { - return false; - } - return true; - }; std::vector unfiltered_entries; - PAIMON_RETURN_NOT_OK(manifest_file_->Read(manifest.FileName(), filter, &unfiltered_entries)); + PAIMON_RETURN_NOT_OK(manifest_file_->Read( + manifest.FileName(), + [this](const ManifestEntry& entry) -> Result { return FilterManifestEntry(entry); }, + &unfiltered_entries)); entries->reserve(entries->size() + unfiltered_entries.size()); for (auto& entry : unfiltered_entries) { - PAIMON_ASSIGN_OR_RAISE(bool res, FilterByStats(entry)); - if (res) { - entries->emplace_back(std::move(entry)); - } + entries->emplace_back(std::move(entry)); } return Status::OK(); } +Status FileStoreScan::ReadManifestFileMetaWithoutScanFilter( + const ManifestFileMeta& manifest, std::vector* entries) const { + return manifest_file_->Read(manifest.FileName(), /*filter=*/nullptr, entries); +} + +Result FileStoreScan::FilterManifestEntry(const ManifestEntry& entry) const { + if (partition_filter_) { + PAIMON_ASSIGN_OR_RAISE(bool res, + partition_filter_->Test(partition_schema_, entry.Partition())); + if (!res) { + return false; + } + } + if (only_read_real_buckets_ && entry.Bucket() < 0) { + return false; + } + if (bucket_filter_ != std::nullopt && entry.Bucket() != bucket_filter_.value()) { + return false; + } + if (level_filter_ != nullptr && !level_filter_(entry.Level())) { + return false; + } + return FilterByStats(entry); +} + Status FileStoreScan::SplitAndSetFilter(const std::vector& partition_keys, const std::shared_ptr& arrow_schema, const std::shared_ptr& scan_filters) { diff --git a/src/paimon/core/operation/file_store_scan.h b/src/paimon/core/operation/file_store_scan.h index 46a06e4a7..4ddc9335d 100644 --- a/src/paimon/core/operation/file_store_scan.h +++ b/src/paimon/core/operation/file_store_scan.h @@ -21,12 +21,12 @@ #include #include #include -#include #include #include #include #include +#include "paimon/cache/cache.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/metrics/metrics_impl.h" #include "paimon/common/predicate/compound_predicate_impl.h" @@ -41,6 +41,7 @@ #include "paimon/core/manifest/manifest_file_meta.h" #include "paimon/core/manifest/manifest_list.h" #include "paimon/core/manifest/partition_entry.h" +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/snapshot.h" #include "paimon/core/table/source/scan_mode.h" @@ -120,6 +121,12 @@ class FileStoreScan { return this; } + FileStoreScan* WithSnapshotLiveManifestEntriesCacheKey( + const std::shared_ptr& cache_key) { + snapshot_live_manifest_cache_key_ = cache_key; + return this; + } + virtual FileStoreScan* EnableValueFilter() { return this; } @@ -237,6 +244,23 @@ class FileStoreScan { Status ReadManifestEntries(const std::vector& manifest_metas, std::vector* manifest_entries) const; + struct LiveManifestCacheReadStats { + bool hit = false; + int64_t incremental_snapshots = 0; + int64_t loaded_manifests = 0; + }; + + Status ReadManifestEntriesWithCache(const Snapshot& snapshot, + const std::vector& all_manifest_metas, + std::vector* manifest_entries, + LiveManifestCacheReadStats* stats) const; + Result LoadSnapshotLiveManifestEntries() const; + Status StoreSnapshotLiveManifestEntries(const SnapshotLiveManifestEntries& entries) const; + + Status ReadAndMergeFileEntriesWithoutScanFilter( + const std::vector& manifest_metas, + std::vector* merged_entries) const; + Status ReadAndMergeFileEntries(const std::vector& manifest_metas, std::vector* merged_entries) const; @@ -246,6 +270,9 @@ class FileStoreScan { Status ReadFileEntries(const std::vector& manifest_metas, std::vector* manifest_entries) const; + Status ReadFileEntriesWithoutScanFilter(const std::vector& manifest_metas, + std::vector* manifest_entries) const; + Result FilterManifestFileMeta(const ManifestFileMeta& manifest) const; bool FilterManifestByRowRanges(const ManifestFileMeta& manifest) const; @@ -253,6 +280,11 @@ class FileStoreScan { Status ReadManifestFileMeta(const ManifestFileMeta& manifest, std::vector* entries) const; + Status ReadManifestFileMetaWithoutScanFilter(const ManifestFileMeta& manifest, + std::vector* entries) const; + + Result FilterManifestEntry(const ManifestEntry& entry) const; + protected: std::shared_ptr pool_; std::shared_ptr schema_manager_; @@ -265,7 +297,6 @@ class FileStoreScan { CoreOptions core_options_; private: - mutable std::mutex lock_; bool only_read_real_buckets_ = false; std::shared_ptr snapshot_manager_; std::shared_ptr manifest_list_; @@ -277,5 +308,6 @@ class FileStoreScan { std::function level_filter_; std::optional specified_snapshot_; std::shared_ptr metrics_; + std::shared_ptr snapshot_live_manifest_cache_key_; }; } // namespace paimon diff --git a/src/paimon/core/operation/file_store_scan_test.cpp b/src/paimon/core/operation/file_store_scan_test.cpp index d2ef0db13..0bf64acaa 100644 --- a/src/paimon/core/operation/file_store_scan_test.cpp +++ b/src/paimon/core/operation/file_store_scan_test.cpp @@ -16,13 +16,17 @@ #include "paimon/core/operation/file_store_scan.h" +#include + #include "arrow/type.h" #include "gtest/gtest.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_kind.h" #include "paimon/core/manifest/file_source.h" +#include "paimon/core/manifest/snapshot_live_manifest_entries.h" #include "paimon/core/stats/simple_stats.h" #include "paimon/data/timestamp.h" +#include "paimon/memory/memory_segment.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { @@ -183,4 +187,84 @@ TEST_F(FileStoreScanTest, TestFilterManifestByRowRanges) { file_store_scan->WithRowRangeIndex(row_range_index); ASSERT_TRUE(file_store_scan->FilterManifestByRowRanges(manifest2)); } + +TEST_F(FileStoreScanTest, TestSnapshotLiveManifestEntries) { + SnapshotLiveManifestEntries entries(/*max_snapshots=*/2); + std::vector snapshot1; + ASSERT_OK_AND_ASSIGN( + auto file1, + DataFileMeta::ForAppend("file-1", /*file_size=*/10, /*row_count=*/1, + SimpleStats::EmptyStats(), /*min_sequence_number=*/0, + /*max_sequence_number=*/0, /*schema_id=*/0, + /*file_source=*/std::nullopt, /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + snapshot1.emplace_back(FileKind::Add(), BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, file1); + entries.Put(/*snapshot_id=*/1, std::move(snapshot1)); + + ASSERT_EQ(entries.Size(), 1); + auto hit = entries.LatestBeforeOrEqual(/*snapshot_id=*/1); + ASSERT_TRUE(hit); + ASSERT_EQ(hit->snapshot_id, 1); + ASSERT_EQ(hit->entries.size(), 1); + ASSERT_EQ(hit->entries[0].FileName(), "file-1"); + auto latest_before_2 = entries.LatestBeforeOrEqual(/*snapshot_id=*/2); + ASSERT_TRUE(latest_before_2); + ASSERT_EQ(latest_before_2->snapshot_id, 1); + + std::vector snapshot3; + ASSERT_OK_AND_ASSIGN( + auto file3, + DataFileMeta::ForAppend("file-3", /*file_size=*/10, /*row_count=*/1, + SimpleStats::EmptyStats(), /*min_sequence_number=*/0, + /*max_sequence_number=*/0, /*schema_id=*/0, + /*file_source=*/std::nullopt, /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt)); + snapshot3.emplace_back(FileKind::Add(), BinaryRow::EmptyRow(), /*bucket=*/0, + /*total_buckets=*/1, file3); + entries.Put(/*snapshot_id=*/3, std::move(snapshot3)); + + auto latest_before_4 = entries.LatestBeforeOrEqual(/*snapshot_id=*/4); + ASSERT_TRUE(latest_before_4); + ASSERT_EQ(latest_before_4->snapshot_id, 3); + auto latest_before_3 = entries.LatestBeforeOrEqual(/*snapshot_id=*/3); + ASSERT_TRUE(latest_before_3); + ASSERT_EQ(latest_before_3->snapshot_id, 3); + + entries.Put(/*snapshot_id=*/5, {}); + ASSERT_EQ(entries.Size(), 2); + auto before_first = entries.LatestBeforeOrEqual(/*snapshot_id=*/1); + ASSERT_FALSE(before_first); + auto kept_snapshot3 = entries.LatestBeforeOrEqual(/*snapshot_id=*/3); + ASSERT_TRUE(kept_snapshot3); + auto kept_snapshot5 = entries.LatestBeforeOrEqual(/*snapshot_id=*/5); + ASSERT_TRUE(kept_snapshot5); +} + +TEST_F(FileStoreScanTest, TestSnapshotLiveManifestEntriesSerialization) { + SnapshotLiveManifestEntries entries(/*max_snapshots=*/2); + entries.Put(/*snapshot_id=*/1, {}); + entries.Put(/*snapshot_id=*/3, {}); + + ASSERT_OK_AND_ASSIGN(auto bytes, entries.Serialize(GetDefaultPool())); + ASSERT_OK_AND_ASSIGN(auto deserialized, SnapshotLiveManifestEntries::Deserialize( + MemorySegment::Wrap(bytes), + /*max_snapshots=*/2, GetDefaultPool())); + ASSERT_EQ(deserialized.Size(), 2); + auto hit = deserialized.LatestBeforeOrEqual(/*snapshot_id=*/4); + ASSERT_TRUE(hit); + ASSERT_EQ(hit->snapshot_id, 3); + + ASSERT_OK_AND_ASSIGN(auto evicted_deserialized, SnapshotLiveManifestEntries::Deserialize( + MemorySegment::Wrap(bytes), + /*max_snapshots=*/1, GetDefaultPool())); + ASSERT_EQ(evicted_deserialized.Size(), 1); + auto before_first = evicted_deserialized.LatestBeforeOrEqual(/*snapshot_id=*/1); + ASSERT_FALSE(before_first); + auto kept = evicted_deserialized.LatestBeforeOrEqual(/*snapshot_id=*/3); + ASSERT_TRUE(kept); + ASSERT_EQ(kept->snapshot_id, 3); +} } // namespace paimon::test diff --git a/src/paimon/core/operation/metrics/scan_metrics.h b/src/paimon/core/operation/metrics/scan_metrics.h index a414314ff..674b06738 100644 --- a/src/paimon/core/operation/metrics/scan_metrics.h +++ b/src/paimon/core/operation/metrics/scan_metrics.h @@ -26,6 +26,11 @@ class ScanMetrics { static constexpr char SCAN_DURATION[] = "scanDuration"; static constexpr char LAST_SCANNED_SNAPSHOT_ID[] = "lastScannedSnapshotId"; static constexpr char LAST_SCANNED_MANIFESTS[] = "lastScannedManifests"; + static constexpr char LAST_MANIFEST_ENTRY_CACHE_HIT[] = "lastManifestEntryCacheHit"; + static constexpr char LAST_MANIFEST_ENTRY_CACHE_INCREMENTAL_SNAPSHOTS[] = + "lastManifestEntryCacheIncrementalSnapshots"; + static constexpr char LAST_MANIFEST_ENTRY_CACHE_LOADED_MANIFESTS[] = + "lastManifestEntryCacheLoadedManifests"; static constexpr char LAST_SCAN_SKIPPED_TABLE_FILES[] = "lastScanSkippedTableFiles"; static constexpr char LAST_SCAN_RESULTED_TABLE_FILES[] = "lastScanResultedTableFiles"; }; diff --git a/src/paimon/core/table/source/table_scan.cpp b/src/paimon/core/table/source/table_scan.cpp index 7869cf8b5..3d63195aa 100644 --- a/src/paimon/core/table/source/table_scan.cpp +++ b/src/paimon/core/table/source/table_scan.cpp @@ -17,12 +17,14 @@ #include "paimon/table/source/table_scan.h" #include +#include #include #include #include #include #include "fmt/format.h" +#include "paimon/cache/cache.h" #include "paimon/common/predicate/predicate_validator.h" #include "paimon/common/types/data_field.h" #include "paimon/common/utils/fields_comparator.h" @@ -71,6 +73,17 @@ namespace { class TableScanImpl { public: + static std::unique_ptr WithSnapshotLiveManifestEntriesCacheKey( + std::unique_ptr&& scan, const ScanContext* context, + const CoreOptions& core_options) { + if (core_options.GetScanManifestEntryCacheMaxSnapshots() > 0 && + core_options.GetCache() != nullptr) { + scan->WithSnapshotLiveManifestEntriesCacheKey(CacheKey::ForSnapshotLiveManifestEntries( + context->GetPath(), BranchManager::NormalizeBranch(core_options.GetBranch()))); + } + return std::move(scan); + } + static Result> CreateFileStoreScan( const std::shared_ptr& path_factory, const std::shared_ptr& arrow_schema, @@ -95,19 +108,29 @@ class TableScanImpl { ManifestFile::Create(fs, manifest_file_format, core_options.GetManifestCompression(), path_factory, core_options.GetManifestTargetFileSize(), memory_pool, core_options, partition_schema)); + std::unique_ptr scan; if (table_schema->PrimaryKeys().empty()) { if (core_options.DataEvolutionEnabled()) { - return DataEvolutionFileStoreScan::Create( - snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, - arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + PAIMON_ASSIGN_OR_RAISE( + scan, DataEvolutionFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, + table_schema, arrow_schema, context->GetScanFilters(), core_options, + executor, memory_pool)); + } else { + PAIMON_ASSIGN_OR_RAISE( + scan, AppendOnlyFileStoreScan::Create( + snapshot_manager, schema_manager, manifest_list, manifest_file, + table_schema, arrow_schema, context->GetScanFilters(), core_options, + executor, memory_pool)); } - return AppendOnlyFileStoreScan::Create( - snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, - arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + } else { + PAIMON_ASSIGN_OR_RAISE( + scan, KeyValueFileStoreScan::Create(snapshot_manager, schema_manager, manifest_list, + manifest_file, table_schema, arrow_schema, + context->GetScanFilters(), core_options, + executor, memory_pool)); } - return KeyValueFileStoreScan::Create( - snapshot_manager, schema_manager, manifest_list, manifest_file, table_schema, - arrow_schema, context->GetScanFilters(), core_options, executor, memory_pool); + return WithSnapshotLiveManifestEntriesCacheKey(std::move(scan), context, core_options); } static Result> CreateSplitGenerator(