diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java index 9dca62508f65..a75e4d92d051 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java @@ -43,7 +43,6 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.FormatTable; -import org.apache.paimon.table.format.predicate.PredicateUtils; import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableScan; @@ -122,7 +121,10 @@ public List listPartitionEntries() { new Path(table.location()), table.partitionKeys().size(), table.partitionKeys(), - coreOptions.formatTablePartitionOnlyValueInPath()); + coreOptions.formatTablePartitionOnlyValueInPath(), + null, + table.partitionType(), + table.defaultPartName()); List partitionEntries = new ArrayList<>(); for (Pair, Path> partition2Path : partition2Paths) { BinaryRow row = toPartitionRow(partition2Path.getKey()); @@ -199,17 +201,11 @@ List, Path>> findPartitions() { // search paths with partition filter optimization // This will prune partition directories early during traversal, // which is especially important for cloud storage like OSS/S3 - Map partitionPredicates = new HashMap<>(); Optional predicate = extractPartitionPredicate(partitionFilter); LOG.debug( "Extracted predicate for format table {} partition pruning: {}", table.name(), predicate.orElse(null)); - if (predicate.isPresent()) { - partitionPredicates = - PredicateUtils.splitPartitionPredicate( - table.partitionType(), predicate.get()); - } Pair scanPathAndLevel = computeScanPathAndLevel( @@ -224,7 +220,7 @@ List, Path>> findPartitions() { scanPathAndLevel.getRight(), table.partitionKeys(), onlyValueInPath, - partitionPredicates, + predicate.orElse(null), table.partitionType(), table.defaultPartName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java deleted file mode 100644 index 86ef0a13ad5c..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/predicate/PredicateUtils.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.format.predicate; - -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.types.RowType; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - -import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; -import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames; - -/** Utility methods for working with {@link Predicate}s. */ -public class PredicateUtils { - - // Use splitAnd split the predicate, then group them by the partition fields. - public static Map splitPartitionPredicate( - RowType partitionType, Predicate predicate) { - int[] fieldMap = new int[partitionType.getFields().size()]; - Arrays.fill(fieldMap, 0); - List predicates = PredicateBuilder.splitAnd(predicate); - Set partitionFieldNames = new HashSet<>(partitionType.getFieldNames()); - Map result = new HashMap<>(); - - for (Predicate sub : predicates) { - // Collect all field names referenced by this predicate - Set referencedFields = collectFieldNames(sub); - Optional transformed = transformFieldMapping(sub, fieldMap); - if (transformed.isPresent() && referencedFields.size() == 1) { - Predicate child = transformed.get(); - // Only include predicates that reference exactly one partition field - String fieldName = referencedFields.iterator().next(); - if (partitionFieldNames.contains(fieldName)) { - if (result.containsKey(fieldName)) { - // Combine with existing predicate using AND - result.put(fieldName, PredicateBuilder.and(result.get(fieldName), child)); - } else { - result.put(fieldName, child); - } - } - } - } - - return result; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java index fc2fe23e059f..fb7baf5b06b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionPathUtils.java @@ -19,10 +19,17 @@ package org.apache.paimon.utils; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.And; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -36,6 +43,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -286,7 +294,7 @@ public static List, Path>> searchPartSpecAndP partitionNumber, partitionKeys, onlyValueInPath, - Collections.emptyMap(), + (Predicate) null, null, null); } @@ -297,7 +305,7 @@ public static List, Path>> searchPartSpecAndP int partitionNumber, List partitionKeys, boolean onlyValueInPath, - Map partitionFilter, + @Nullable Predicate partitionFilter, @Nullable RowType partitionType, @Nullable String defaultPartValue) { FileStatus[] generatedParts = @@ -312,8 +320,7 @@ public static List, Path>> searchPartSpecAndP defaultPartValue); List, Path>> ret = new ArrayList<>(); for (FileStatus part : generatedParts) { - // ignore hidden file - if (isHiddenFile(part)) { + if (isHiddenFile(part, onlyValueInPath, defaultPartValue)) { continue; } if (onlyValueInPath) { @@ -340,16 +347,22 @@ private static FileStatus[] getFileStatusRecurse( FileIO fileIO, List partitionKeys, boolean onlyValueInPath, - Map partitionFilter, + @Nullable Predicate partitionFilter, @Nullable RowType partitionType, @Nullable String defaultPartValue) { ArrayList result = new ArrayList<>(); + // Only predicate-referenced levels are parsed/pruned. + Set referencedFields = + partitionFilter == null + ? Collections.emptySet() + : PredicateVisitor.collectFieldNames(partitionFilter); + GenericRow values = + partitionType == null ? null : new GenericRow(partitionType.getFieldCount()); + try { FileStatus fileStatus = fileIO.getFileStatus(path); - // Calculate the starting offset when we begin from a prefix path - // For example, if partitionKeys = [ds, hr] and expectLevel = 1 (only hr remaining), - // then levelOffset = 2 - 1 = 1, so we access partitionKeys[1] for level 0 + // Skip partition levels already fixed by the scan-path prefix. int levelOffset = partitionKeys.size() - expectLevel; listStatusRecursively( fileIO, @@ -360,9 +373,11 @@ private static FileStatus[] getFileStatusRecurse( partitionKeys, onlyValueInPath, partitionFilter, + referencedFields, partitionType, defaultPartValue, - levelOffset); + levelOffset, + values); } catch (FileNotFoundException e) { return new FileStatus[0]; } catch (IOException e) { @@ -380,12 +395,14 @@ private static void listStatusRecursively( List results, List partitionKeys, boolean onlyValueInPath, - Map partitionFilter, + @Nullable Predicate partitionFilter, + Set referencedFields, @Nullable RowType partitionType, @Nullable String defaultPartValue, - int levelOffset) + int levelOffset, + @Nullable GenericRow values) throws IOException { - if (isHiddenFile(fileStatus.getPath())) { + if (isHiddenFile(fileStatus, onlyValueInPath, defaultPartValue)) { return; } @@ -396,69 +413,30 @@ private static void listStatusRecursively( if (fileStatus.isDir()) { for (FileStatus stat : fileIO.listStatus(fileStatus.getPath())) { - // Calculate the actual partition key index considering the level offset - // When starting from a prefix path, levelOffset accounts for already-traversed - // levels int partitionKeyIndex = levelOffset + level; - - // Apply partition filter if available - if (partitionFilter.containsKey(partitionKeys.get(partitionKeyIndex)) - && partitionType != null) { - - Predicate partitionPredicate = - partitionFilter.get(partitionKeys.get(partitionKeyIndex)); - // Extract the partition value from the directory name - String dirName = stat.getPath().getName(); - String partitionKey = partitionKeys.get(partitionKeyIndex); - String partitionValue; - - if (onlyValueInPath) { - partitionValue = unescapePathName(dirName); - } else { - // Parse key=value format - Matcher m = PARTITION_NAME_PATTERN.matcher(dirName); - if (m.matches()) { - String key = unescapePathName(m.group(1)); - if (!key.equals(partitionKey)) { - // Key doesn't match expected partition key, skip filtering - partitionValue = null; - } else { - partitionValue = unescapePathName(m.group(2)); - } - } else { - // Not a valid partition directory format - partitionValue = null; - } - } - + String partitionKey = partitionKeys.get(partitionKeyIndex); + + // Bind the current partition value and prune only when the partially bound + // predicate is provably false. Unreferenced or unparseable levels are descended + // without pruning. + if (partitionFilter != null + && partitionType != null + && values != null + && referencedFields.contains(partitionKey)) { + String partitionValue = + parsePartitionValue( + stat.getPath().getName(), partitionKey, onlyValueInPath); if (partitionValue != null) { - // Convert the partition value to internal format Object internalValue = defaultPartValue != null && defaultPartValue.equals(partitionValue) ? null : castFromString( partitionValue, partitionType.getTypeAt(partitionKeyIndex)); - - GenericRow partialRow = GenericRow.of(internalValue); - if (!partitionPredicate.test(partialRow)) { + values.setField(partitionKeyIndex, internalValue); + if (!mightMatch(partitionFilter, levelOffset, partitionKeyIndex, values)) { continue; } - - // Pass the accumulated values to the next level - listStatusRecursively( - fileIO, - stat, - level + 1, - expectLevel, - results, - partitionKeys, - onlyValueInPath, - partitionFilter, - partitionType, - defaultPartValue, - levelOffset); - continue; } } @@ -471,19 +449,91 @@ private static void listStatusRecursively( partitionKeys, onlyValueInPath, partitionFilter, + referencedFields, partitionType, defaultPartValue, - levelOffset); + levelOffset, + values); + } + } + } + + /** Returns the partition value from a directory name, or {@code null} when it can't be used. */ + @Nullable + private static String parsePartitionValue( + String dirName, String partitionKey, boolean onlyValueInPath) { + if (onlyValueInPath) { + return unescapePathName(dirName); + } + Matcher m = PARTITION_NAME_PATTERN.matcher(dirName); + if (m.matches()) { + String key = unescapePathName(m.group(1)); + if (!key.equals(partitionKey)) { + return null; + } + return unescapePathName(m.group(2)); + } + return null; + } + + /** + * Returns whether a subtree might still match the partition predicate. + * + *

{@code values} holds the currently bound partition values in {@code [minIdx, maxIdx]}. + * Indices below {@code minIdx} belong to the scan-path prefix; indices above {@code maxIdx} are + * not known yet. Returning {@code false} means the subtree can be pruned safely. + */ + static boolean mightMatch( + @Nullable Predicate predicate, int minIdx, int maxIdx, InternalRow values) { + if (predicate == null) { + return true; + } + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compound = (CompoundPredicate) predicate; + if (compound.function() instanceof Or) { + for (Predicate child : compound.children()) { + if (mightMatch(child, minIdx, maxIdx, values)) { + return true; + } + } + return false; } + if (compound.function() instanceof And) { + for (Predicate child : compound.children()) { + if (!mightMatch(child, minIdx, maxIdx, values)) { + return false; + } + } + return true; + } + return true; } + if (predicate instanceof LeafPredicate) { + for (Object input : ((LeafPredicate) predicate).transform().inputs()) { + if (input instanceof FieldRef) { + int idx = ((FieldRef) input).index(); + if (idx < minIdx || idx > maxIdx) { + return true; + } + } + } + return predicate.test(values); + } + // Unknown predicate node: be conservative. + return true; } - private static boolean isHiddenFile(FileStatus fileStatus) { - return isHiddenFile(fileStatus.getPath()); + private static boolean isHiddenFile( + FileStatus fileStatus, boolean onlyValueInPath, @Nullable String defaultPartValue) { + return isHiddenFile(fileStatus.getPath(), onlyValueInPath, defaultPartValue); } - private static boolean isHiddenFile(Path path) { + private static boolean isHiddenFile( + Path path, boolean onlyValueInPath, @Nullable String defaultPartValue) { String name = path.getName(); + if (onlyValueInPath && defaultPartValue != null && defaultPartValue.equals(name)) { + return false; + } return name.startsWith("_") || name.startsWith("."); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java index 0584577a1a21..18f22c8112cd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.format.csv.CsvOptions; @@ -1018,4 +1019,313 @@ public FileStatus[] listStatus(Path path) throws IOException { .collect(java.util.stream.Collectors.toList()); assertEquals(Arrays.asList("4", "5", "6", "7", "8", "9"), months); } + + @TestTemplate + void testFindPartitionsWithOrCrossFieldPredicate() throws IOException { + Path tableLocation = new Path(tmpPath.toUri()); + createYearMonthPartitionDirs(LocalFileIO.create(), tableLocation); + + AtomicInteger listCount = new AtomicInteger(0); + FormatTable formatTable = + createYearMonthFormatTable(countingFileIO(listCount), tableLocation); + + // (year = 2024 AND month < 6) OR (year = 2025 AND month >= 6): a cross-field OR that the + // old per-field split model could not express, so it used to fall back to listing every + // year directory. + PredicateBuilder builder = new PredicateBuilder(formatTable.partitionType()); + Predicate orPredicate = + PredicateBuilder.or( + PredicateBuilder.and(builder.equal(0, 2024), builder.lessThan(1, 6)), + PredicateBuilder.and(builder.equal(0, 2025), builder.greaterOrEqual(1, 6))); + PartitionPredicate partitionFilter = + PartitionPredicate.fromPredicate(formatTable.partitionType(), orPredicate); + + FormatTableScan scan = new FormatTableScan(formatTable, partitionFilter, null); + List, Path>> result = scan.findPartitions(); + + // Only year=2024 and year=2025 are descended: root + 2 year dirs = 3 list calls. + assertEquals(3, listCount.get()); + // 2024 months 1-5 (5) + 2025 months 6-12 (7) = 12 partitions. + assertEquals(12, result.size()); + for (Pair, Path> pair : result) { + int year = Integer.parseInt(pair.getKey().get("year")); + int month = Integer.parseInt(pair.getKey().get("month")); + assertTrue( + (year == 2024 && month < 6) || (year == 2025 && month >= 6), + "unexpected partition " + year + "-" + month); + } + } + + @TestTemplate + void testFindPartitionsWithNestedAndOr() throws IOException { + Path tableLocation = new Path(tmpPath.toUri()); + createYearMonthPartitionDirs(LocalFileIO.create(), tableLocation); + + AtomicInteger listCount = new AtomicInteger(0); + FormatTable formatTable = + createYearMonthFormatTable(countingFileIO(listCount), tableLocation); + + // year > 2022 AND (month = 1 OR month = 12) + PredicateBuilder builder = new PredicateBuilder(formatTable.partitionType()); + Predicate predicate = + PredicateBuilder.and( + builder.greaterThan(0, 2022), + PredicateBuilder.or(builder.equal(1, 1), builder.equal(1, 12))); + PartitionPredicate partitionFilter = + PartitionPredicate.fromPredicate(formatTable.partitionType(), predicate); + + FormatTableScan scan = new FormatTableScan(formatTable, partitionFilter, null); + List, Path>> result = scan.findPartitions(); + + // year=2022 pruned; 2023-2026 descended: root + 4 year dirs = 5 list calls. + assertEquals(5, listCount.get()); + // 4 years x months {1, 12} = 8 partitions. + assertEquals(8, result.size()); + for (Pair, Path> pair : result) { + int year = Integer.parseInt(pair.getKey().get("year")); + int month = Integer.parseInt(pair.getKey().get("month")); + assertTrue( + year > 2022 && (month == 1 || month == 12), + "unexpected partition " + year + "-" + month); + } + } + + @TestTemplate + void testFindPartitionsWithOrSingleField() throws IOException { + Path tableLocation = new Path(tmpPath.toUri()); + createYearMonthPartitionDirs(LocalFileIO.create(), tableLocation); + + AtomicInteger listCount = new AtomicInteger(0); + FormatTable formatTable = + createYearMonthFormatTable(countingFileIO(listCount), tableLocation); + + // year = 2023 OR year = 2025 (single-field OR now prunes too) + PredicateBuilder builder = new PredicateBuilder(formatTable.partitionType()); + Predicate predicate = PredicateBuilder.or(builder.equal(0, 2023), builder.equal(0, 2025)); + PartitionPredicate partitionFilter = + PartitionPredicate.fromPredicate(formatTable.partitionType(), predicate); + + FormatTableScan scan = new FormatTableScan(formatTable, partitionFilter, null); + List, Path>> result = scan.findPartitions(); + + // Only year=2023 and year=2025 descended: root + 2 = 3 list calls. + assertEquals(3, listCount.get()); + // 2 years x 12 months = 24 partitions. + assertEquals(24, result.size()); + List years = + result.stream() + .map(pair -> pair.getKey().get("year")) + .distinct() + .sorted() + .collect(java.util.stream.Collectors.toList()); + assertEquals(Arrays.asList("2023", "2025"), years); + } + + @TestTemplate + void testFindPartitionsWithPrefixAndOrPredicate() throws IOException { + Path tableLocation = new Path(tmpPath.toUri()); + createYearMonthPartitionDirs(LocalFileIO.create(), tableLocation); + + AtomicInteger listCount = new AtomicInteger(0); + FormatTable formatTable = + createYearMonthFormatTable(countingFileIO(listCount), tableLocation); + + // year = 2024 is pushed into the scan-path prefix. The remaining OR on month must not read + // the unbound prefix slot and accidentally prune the whole subtree. + PredicateBuilder builder = new PredicateBuilder(formatTable.partitionType()); + Predicate predicate = + PredicateBuilder.and( + builder.equal(0, 2024), + PredicateBuilder.or(builder.equal(1, 1), builder.equal(1, 12))); + PartitionPredicate partitionFilter = + PartitionPredicate.fromPredicate(formatTable.partitionType(), predicate); + + FormatTableScan scan = new FormatTableScan(formatTable, partitionFilter, null); + List, Path>> result = scan.findPartitions(); + + // The equality prefix narrows the scan to year=2024, so only its month directory is listed. + assertEquals(1, listCount.get()); + assertEquals(2, result.size()); + List months = + result.stream() + .map(pair -> pair.getKey().get("month")) + .distinct() + .sorted() + .collect(java.util.stream.Collectors.toList()); + assertEquals(Arrays.asList("1", "12"), months); + assertTrue(result.stream().allMatch(pair -> "2024".equals(pair.getKey().get("year")))); + } + + @TestTemplate + void testFindPartitionsWithNullDefaultPartitionInOrPredicate() throws IOException { + Path tableLocation = new Path(tmpPath.toUri()); + + AtomicInteger listCount = new AtomicInteger(0); + FormatTable formatTable = createDtHourFormatTable(countingFileIO(listCount), tableLocation); + createDtHourPartitionDirs( + LocalFileIO.create(), + tableLocation, + formatTable.defaultPartName(), + Arrays.asList( + Pair.of(formatTable.defaultPartName(), "10"), + Pair.of("20260625", "10"), + Pair.of("20260624", "20"), + Pair.of("20260101", "10"))); + + // The default partition name should be treated as null while pruning. + PredicateBuilder builder = new PredicateBuilder(formatTable.partitionType()); + Predicate predicate = + PredicateBuilder.or( + builder.isNull(0), + PredicateBuilder.and( + builder.equal(0, BinaryString.fromString("20260625")), + builder.equal(1, BinaryString.fromString("10")))); + PartitionPredicate partitionFilter = + PartitionPredicate.fromPredicate(formatTable.partitionType(), predicate); + + FormatTableScan scan = new FormatTableScan(formatTable, partitionFilter, null); + List, Path>> result = scan.findPartitions(); + + // Only the default dt partition and dt=20260625 are descended: root + 2 = 3 list calls. + assertEquals(3, listCount.get()); + assertEquals(2, result.size()); + assertContainsDefaultAndConcreteDt(result, formatTable.defaultPartName()); + } + + @TestTemplate + void testListPartitionEntriesWithNullDefaultPartitionInOnlyValuePath() throws IOException { + Path tableLocation = new Path(tmpPath.toUri()); + FormatTable formatTable = createDtHourFormatTable(LocalFileIO.create(), tableLocation); + createDtHourPartitionDirs( + LocalFileIO.create(), + tableLocation, + formatTable.defaultPartName(), + Arrays.asList( + Pair.of(formatTable.defaultPartName(), "10"), Pair.of("20260625", "10"))); + + FormatTableScan scan = new FormatTableScan(formatTable, null, null); + List entries = scan.listPartitionEntries(); + + assertEquals(2, entries.size()); + assertTrue(entries.stream().anyMatch(entry -> entry.partition().isNullAt(0))); + assertTrue( + entries.stream() + .anyMatch( + entry -> + !entry.partition().isNullAt(0) + && BinaryString.fromString("20260625") + .equals(entry.partition().getString(0)))); + } + + private void createYearMonthPartitionDirs(LocalFileIO fileIO, Path tableLocation) + throws IOException { + for (int year = 2022; year <= 2026; year++) { + String partPath = enablePartitionValueOnly ? String.valueOf(year) : "year=" + year; + for (int month = 1; month <= 12; month++) { + String monthPart = + enablePartitionValueOnly + ? partPath + "/" + month + : partPath + "/month=" + month; + fileIO.mkdirs(new Path(tableLocation, monthPart)); + } + } + } + + private void createDtHourPartitionDirs( + LocalFileIO fileIO, + Path tableLocation, + String defaultPartName, + List> partitions) + throws IOException { + for (Pair partition : partitions) { + String dt = partition.getLeft(); + String hour = partition.getRight(); + String partPath; + if (enablePartitionValueOnly) { + partPath = dt + "/" + hour; + } else { + partPath = + String.format( + "dt=%s/hour=%s", + dt == null ? defaultPartName : dt, + hour == null ? defaultPartName : hour); + } + fileIO.mkdirs(new Path(tableLocation, partPath)); + } + } + + private FormatTable createYearMonthFormatTable(LocalFileIO fileIO, Path tableLocation) { + RowType rowType = + RowType.builder() + .field("year", DataTypes.INT()) + .field("month", DataTypes.INT()) + .field("a", DataTypes.INT()) + .build(); + return FormatTable.builder() + .fileIO(fileIO) + .identifier(Identifier.create("test_db", "test_table")) + .rowType(rowType) + .partitionKeys(Arrays.asList("year", "month")) + .location(tableLocation.toString()) + .format(FormatTable.Format.CSV) + .options( + Collections.singletonMap( + FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH.key(), + String.valueOf(enablePartitionValueOnly))) + .build(); + } + + private FormatTable createDtHourFormatTable(LocalFileIO fileIO, Path tableLocation) { + RowType rowType = + RowType.builder() + .field("dt", DataTypes.STRING()) + .field("hour", DataTypes.STRING()) + .field("a", DataTypes.INT()) + .build(); + return FormatTable.builder() + .fileIO(fileIO) + .identifier(Identifier.create("test_db", "test_table")) + .rowType(rowType) + .partitionKeys(Arrays.asList("dt", "hour")) + .location(tableLocation.toString()) + .format(FormatTable.Format.CSV) + .options( + Collections.singletonMap( + FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH.key(), + String.valueOf(enablePartitionValueOnly))) + .build(); + } + + private void assertContainsDefaultAndConcreteDt( + List, Path>> result, String defaultPartName) { + List dts = + result.stream() + .map(pair -> pair.getKey().get("dt")) + .distinct() + .sorted() + .collect(java.util.stream.Collectors.toList()); + assertEquals(Arrays.asList("20260625", defaultPartName), dts); + assertTrue( + result.stream() + .anyMatch( + pair -> + defaultPartName.equals(pair.getKey().get("dt")) + && "10".equals(pair.getKey().get("hour")))); + assertTrue( + result.stream() + .anyMatch( + pair -> + "20260625".equals(pair.getKey().get("dt")) + && "10".equals(pair.getKey().get("hour")))); + } + + private LocalFileIO countingFileIO(AtomicInteger listCount) { + return new LocalFileIO() { + @Override + public FileStatus[] listStatus(Path path) throws IOException { + listCount.getAndIncrement(); + return super.listStatus(path); + } + }; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java deleted file mode 100644 index 2713dae75b2a..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/table/format/predicate/PredicateUtilsTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.table.format.predicate; - -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowType; - -import org.junit.jupiter.api.Test; - -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link PredicateUtils}. */ -public class PredicateUtilsTest { - - private final RowType partitionType = - RowType.builder().field("dt", DataTypes.INT()).field("hr", DataTypes.INT()).build(); - - private final RowType fullRowType = - RowType.builder() - .field("dt", DataTypes.INT()) - .field("hr", DataTypes.INT()) - .field("name", DataTypes.STRING()) - .field("value", DataTypes.INT()) - .build(); - - @Test - public void testMultiplePredicatesOnSamePartitionField() { - PredicateBuilder builder = new PredicateBuilder(fullRowType); - // hr > 10 AND hr < 20 - Predicate hrGreater = builder.greaterThan(1, 10); - Predicate hrLess = builder.lessThan(1, 20); - Predicate dtGreater = builder.greaterThan(0, 0); - Predicate dtLess = builder.lessThan(0, 10); - - Predicate predicate = PredicateBuilder.and(hrGreater, hrLess, dtGreater, dtLess); - - Map result = - PredicateUtils.splitPartitionPredicate(partitionType, predicate); - - assertThat(result).hasSize(2); - assertThat(result).containsKey("hr"); - assertThat(result).containsKey("dt"); - // Should combine with AND - Predicate combined = result.get("hr"); - // Test that the combined predicate works correctly - assertThat(combined.test(GenericRow.of(15))).isTrue(); - assertThat(combined.test(GenericRow.of(5))).isFalse(); - assertThat(combined.test(GenericRow.of(25))).isFalse(); - - combined = result.get("dt"); - // Test that the combined predicate works correctly - assertThat(combined.test(GenericRow.of(5))).isTrue(); - assertThat(combined.test(GenericRow.of(11))).isFalse(); - assertThat(combined.test(GenericRow.of(-1))).isFalse(); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionPathUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionPathUtilsTest.java new file mode 100644 index 000000000000..a21b168f54ea --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionPathUtilsTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import static org.apache.paimon.utils.PartitionPathUtils.mightMatch; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link PartitionPathUtils#mightMatch}. */ +class PartitionPathUtilsTest { + + private final RowType partitionType = + RowType.builder() + .field("year", DataTypes.INT()) + .field("month", DataTypes.INT()) + .build(); + private final PredicateBuilder builder = new PredicateBuilder(partitionType); + + @Test + void testNullPredicate() { + assertThat(mightMatch(null, 0, 0, GenericRow.of(2024, 5))).isTrue(); + } + + @Test + void testBoundLeaf() { + Predicate p = builder.equal(0, 2024); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2024, null))).isTrue(); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2023, null))).isFalse(); + } + + @Test + void testLeafBeyondMaxIdxIsPossiblyTrue() { + // month leaf (idx 1), but only year (idx 0) is bound -> cannot decide -> possibly true + Predicate p = builder.equal(1, 12); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2024, null))).isTrue(); + } + + @Test + void testLeafBelowMinIdxIsPossiblyTrue() { + // year leaf (idx 0) belongs to the scan-path equality prefix (minIdx=1): treated as already + // satisfied and never read, even if values[0] does not match. + Predicate p = builder.equal(0, 2024); + assertThat(mightMatch(p, 1, 1, GenericRow.of(2023, 5))).isTrue(); + } + + @Test + void testOrShortCircuit() { + Predicate p = PredicateBuilder.or(builder.equal(0, 2024), builder.equal(0, 2025)); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2025, null))).isTrue(); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2023, null))).isFalse(); + } + + @Test + void testAndShortCircuit() { + Predicate p = PredicateBuilder.and(builder.equal(0, 2024), builder.equal(1, 6)); + assertThat(mightMatch(p, 0, 1, GenericRow.of(2024, 6))).isTrue(); + assertThat(mightMatch(p, 0, 1, GenericRow.of(2024, 7))).isFalse(); + // month not yet known (maxIdx=0) -> And not provably false -> possibly true + assertThat(mightMatch(p, 0, 0, GenericRow.of(2024, null))).isTrue(); + // year already false -> prune regardless of month + assertThat(mightMatch(p, 0, 0, GenericRow.of(2023, null))).isFalse(); + } + + @Test + void testNestedCrossFieldOr() { + // (year = 2024 AND month < 6) OR (year = 2025 AND month >= 6) + Predicate p = + PredicateBuilder.or( + PredicateBuilder.and(builder.equal(0, 2024), builder.lessThan(1, 6)), + PredicateBuilder.and(builder.equal(0, 2025), builder.greaterOrEqual(1, 6))); + + // year level (only idx 0 bound): keep 2024 and 2025, prune others + assertThat(mightMatch(p, 0, 0, GenericRow.of(2024, null))).isTrue(); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2025, null))).isTrue(); + assertThat(mightMatch(p, 0, 0, GenericRow.of(2023, null))).isFalse(); + + // month level (both bound): exact + assertThat(mightMatch(p, 0, 1, GenericRow.of(2024, 5))).isTrue(); + assertThat(mightMatch(p, 0, 1, GenericRow.of(2024, 6))).isFalse(); + assertThat(mightMatch(p, 0, 1, GenericRow.of(2025, 6))).isTrue(); + assertThat(mightMatch(p, 0, 1, GenericRow.of(2025, 5))).isFalse(); + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala index 4d1d49a799c5..720d28320420 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala @@ -480,6 +480,49 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase with AdaptiveSpark } } + for (onlyValueInPath <- Seq(false, true)) { + val suffix = if (onlyValueInPath) " (partition-path-only-value)" else "" + test(s"Paimon format table: OR cross-field partition pruning$suffix") { + withTable("dwd_fact") { + val props = + if (onlyValueInPath) { + "'format-table.implementation'='paimon', 'format-table.partition-path-only-value'='true'" + } else { + "'format-table.implementation'='paimon'" + } + sql(s""" + |CREATE TABLE dwd_fact (id INT, amount DOUBLE, dt STRING, hour STRING) + |USING PARQUET + |TBLPROPERTIES ($props) + |PARTITIONED BY (dt, hour) + |""".stripMargin) + + sql(""" + |INSERT INTO dwd_fact VALUES + |(1, 10.0, '20260625', '10'), + |(2, 20.0, '20260625', '18'), + |(3, 30.0, '20260624', '20'), + |(4, 40.0, '20260624', '08'), + |(5, 50.0, '20260101', '10') + |""".stripMargin) + + val df = + sql(""" + |SELECT id, dt, hour FROM dwd_fact + |WHERE (dt = '20260625' AND hour < '16') OR (dt = '20260624' AND hour >= '16') + |ORDER BY id + |""".stripMargin) + + checkAnswer(df, Seq(Row(1, "20260625", "10"), Row(3, "20260624", "20"))) + + // The cross-field OR is pushed down as a partition filter, so only the two matching + // partitions produce splits. (Pruning effectiveness is covered by FormatTableScanTest.) + val filteredSplits = collectFilteredInputSplits(df.queryExecution.executedPlan, "dwd_fact") + assert(filteredSplits.size == 2) + } + } + } + def collectFilteredInputSplits(plan: SparkPlan, tableName: String): Seq[Split] = { flatMap(plan) { case s: BatchScanExec =>