diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java index afc58c851317..d2b1b8f3967e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java @@ -43,7 +43,12 @@ import static java.time.temporal.ChronoField.SECOND_OF_MINUTE; import static java.time.temporal.ChronoField.YEAR; -/** Time extractor to extract time from partition values. */ +/** + * Time extractor to extract time from partition values. + * + * @deprecated Use {@link PartitionTimeResolver} + */ +@Deprecated public class PartitionTimeExtractor implements Serializable { private static final long serialVersionUID = 1L; diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeResolver.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeResolver.java new file mode 100644 index 000000000000..9701e7c7eabd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeResolver.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.partition; + +import org.apache.paimon.utils.Pair; + +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.Period; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoField; +import java.time.temporal.TemporalAmount; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Resolves timestamp pattern and formatter to extract time step and compute partition values for + * chain table partitions. + */ +public class PartitionTimeResolver { + private static final Map FIELD_MAP = new HashMap<>(); + private final List partitionColumns; + private final String pattern; + private final String formatter; + private Map> patternFormatMappings; + private Map> patternSpanMappings; + private List patternTokens; + private List formatTokens; + + public PartitionTimeResolver(List partitionColumns, String pattern, String formatter) { + checkArgument(pattern != null, "pattern cannot be null"); + checkArgument(formatter != null, "formatter cannot be null"); + checkArgument(partitionColumns != null, "partitionColumns cannot be null"); + this.partitionColumns = partitionColumns; + this.pattern = pattern; + this.formatter = formatter; + init(); + } + + static { + FIELD_MAP.put('y', ChronoField.YEAR); + FIELD_MAP.put('M', ChronoField.MONTH_OF_YEAR); + FIELD_MAP.put('d', ChronoField.DAY_OF_MONTH); + FIELD_MAP.put('H', ChronoField.HOUR_OF_DAY); + FIELD_MAP.put('h', ChronoField.CLOCK_HOUR_OF_AMPM); + FIELD_MAP.put('m', ChronoField.MINUTE_OF_HOUR); + FIELD_MAP.put('s', ChronoField.SECOND_OF_MINUTE); + } + + private void init() { + this.patternFormatMappings = new HashMap<>(); + this.patternTokens = parsePattern(); + this.formatTokens = parseFormatter(); + boolean matched = matchRecursive(0, 0); + checkArgument( + matched, "Failed to match pattern '%s' to formatter '%s'", pattern, formatter); + this.patternSpanMappings = calPatternSpanMappings(); + } + + /** + * Extracts the minimum time step from the given pattern and formatter. + * + * @return the smallest {@link Duration} or {@link Period} step among variable-controlled time + * units + */ + public TemporalAmount extractMinStep() { + List fieldTokens = + patternFormatMappings.values().stream() + .flatMap(Collection::stream) + .filter(token -> token instanceof TimeFieldToken) + .map(token -> (TimeFieldToken) token) + .collect(Collectors.toList()); + + Optional min = + fieldTokens.stream().min(Comparator.comparingInt(span -> span.field.ordinal())); + checkArgument(min.isPresent(), "No time unit found in variable ranges"); + ChronoField field = min.get().field; + return stepOf(field); + } + + /** + * Computes partition column values by formatting the given datetime and extracting each + * variable's segment according to the pattern-to-format mapping. + */ + public LinkedHashMap resolvePartitionValues(LocalDateTime dateTime) { + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(formatter); + String formatted = dateTime.format(dateTimeFormatter); + LinkedHashMap result = new LinkedHashMap<>(); + for (Map.Entry> entry : + patternSpanMappings.entrySet()) { + String variableName = entry.getKey().token.substring(1); + int start = entry.getValue().getLeft(); + int end = entry.getValue().getRight(); + result.put(variableName, formatted.substring(start, end)); + } + return result; + } + + public LocalDateTime parsePartitionValues(List partitionValues) { + StringBuilder timestampString = new StringBuilder(); + int valueIdx = 0; + for (PatternToken token : patternTokens) { + if (token.isVariable) { + timestampString.append(partitionValues.get(valueIdx).toString()); + valueIdx++; + } else { + timestampString.append(token.token); + } + } + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(formatter, Locale.ROOT); + try { + return LocalDateTime.parse(timestampString, Objects.requireNonNull(dateTimeFormatter)); + } catch (DateTimeParseException e) { + return LocalDateTime.of( + LocalDate.parse(timestampString, Objects.requireNonNull(dateTimeFormatter)), + LocalTime.MIDNIGHT); + } + } + + private Map> calPatternSpanMappings() { + int pos = 0; + Map startPositions = new LinkedHashMap<>(); + for (FormatToken token : formatTokens) { + startPositions.put(token, pos); + pos += token.getLength(); + } + + Map> patternSpanMapping = new LinkedHashMap<>(); + for (PatternToken patternToken : patternTokens) { + if (!patternToken.isVariable) { + continue; + } + List tokens = patternFormatMappings.get(patternToken); + int start = startPositions.get(tokens.get(0)); + int end = start; + for (FormatToken token : tokens) { + end += token.getLength(); + } + patternSpanMapping.put(patternToken, Pair.of(start, end)); + } + return patternSpanMapping; + } + + /** Parses formatter into format tokens (time fields and literals). */ + private List parseFormatter() { + List tokens = new ArrayList<>(); + for (int pos = 0; pos < formatter.length(); pos++) { + char c = formatter.charAt(pos); + if (isTimeChar(c)) { + int start = pos; + while (pos < formatter.length() && formatter.charAt(pos) == c) { + pos++; + } + ChronoField field = FIELD_MAP.get(c); + tokens.add(new TimeFieldToken(field, start, pos)); + pos--; + } else if (c == '\'') { + // parse literals + int start = pos++; + for (; pos < formatter.length(); pos++) { + if (formatter.charAt(pos) == '\'') { + if (pos + 1 < formatter.length() && formatter.charAt(pos + 1) == '\'') { + pos++; + } else { + break; // end of literal + } + } + } + checkArgument( + pos < formatter.length(), + "Pattern ends with an incomplete string literal: " + formatter); + String str = formatter.substring(start + 1, pos); + if (str.isEmpty()) { + tokens.add(new LiteralToken("'", start, pos + 1)); + } else { + tokens.add(new LiteralToken(str.replace("''", "'"), start, pos + 1)); + } + } else { + String text = String.valueOf(c); + tokens.add(new LiteralToken(text, pos, pos + 1)); + } + } + checkArgument(!tokens.isEmpty(), "No time unit found in formatter: %s", formatter); + return tokens; + } + + private static boolean isTimeChar(char c) { + return FIELD_MAP.containsKey(c); + } + + /** Parses pattern string into pattern tokens (variables and literals). */ + private List parsePattern() { + List tokens = new ArrayList<>(); + int len = pattern.length(); + int cursor = 0; + int partCursor = 0; + StringBuilder literalBuf = new StringBuilder(); + while (cursor < len) { + char curr = pattern.charAt(cursor); + if (curr == '$') { + if (literalBuf.length() > 0) { + tokens.add(new PatternToken(literalBuf.toString(), false)); + literalBuf.setLength(0); + } + checkArgument( + partCursor < partitionColumns.size(), + "Extra variable in pattern, exceed partitionColumns count"); + String part = curr + partitionColumns.get(partCursor); + checkArgument(pattern.substring(cursor).startsWith(part)); + tokens.add(new PatternToken(part, true)); + cursor += part.length(); + partCursor++; + } else { + literalBuf.append(curr); + cursor++; + } + } + if (literalBuf.length() > 0) { + tokens.add(new PatternToken(literalBuf.toString(), false)); + } + return tokens; + } + + /** + * Recursively matches pattern tokens to format tokens. For variable tokens, greedily consumes + * consecutive format tokens. For literal tokens, verifies length and content match. + */ + private boolean matchRecursive(int patternIdx, int formatIdx) { + if (patternIdx == patternTokens.size()) { + return formatIdx == formatTokens.size(); + } + + // Remaining format tokens must be at least as many as remaining pattern tokens + if (formatTokens.size() - formatIdx < patternTokens.size() - patternIdx) { + return false; + } + + PatternToken patternToken = patternTokens.get(patternIdx); + // Max format tokens this pattern token can consume, leaving at least 1 token per remaining + // pattern token + int maxLen = formatTokens.size() - formatIdx - (patternTokens.size() - patternIdx - 1); + + for (int len = 1; len <= maxLen; len++) { + int endSpanIdx = formatIdx + len; + if (patternToken.isVariable) { + if (matchRecursive(patternIdx + 1, endSpanIdx)) { + patternFormatMappings.put( + patternToken, formatTokens.subList(formatIdx, endSpanIdx)); + return true; + } + } else { + // Literal pattern tokens match 1...len consecutive format tokens, split by token + // length + if (matchLiteral(patternToken.token, formatTokens, formatIdx, endSpanIdx)) { + if (matchRecursive(patternIdx + 1, endSpanIdx)) { + return true; + } + } + } + } + return false; + } + + /** + * Checks if a literal pattern token matches a sequence of format tokens. Verifies total length + * and literal content match. + */ + private static boolean matchLiteral( + String patternToken, List formatTokens, int startIdx, int endIdx) { + int formatTokenTotalLen = 0; + for (int i = startIdx; i < endIdx; i++) { + formatTokenTotalLen += formatTokens.get(i).getLength(); + } + if (patternToken.length() != formatTokenTotalLen) { + return false; + } + + int pos = 0; + for (int i = startIdx; i < endIdx; i++) { + FormatToken span = formatTokens.get(i); + int spanLen = span.getLength(); + String sub = patternToken.substring(pos, pos + spanLen); + + if (span instanceof LiteralToken) { + if (!((LiteralToken) span).token.equals(sub)) { + return false; + } + } + // TimeFieldToken: length already verified, content is unrestricted + pos += spanLen; + } + return true; + } + + private static TemporalAmount stepOf(ChronoField field) { + switch (field) { + case SECOND_OF_MINUTE: + return Duration.ofSeconds(1); + case MINUTE_OF_HOUR: + return Duration.ofMinutes(1); + case HOUR_OF_DAY: + case CLOCK_HOUR_OF_AMPM: + return Duration.ofHours(1); + case DAY_OF_MONTH: + return Duration.ofDays(1); + case MONTH_OF_YEAR: + return Period.ofMonths(1); + case YEAR: + return Period.ofYears(1); + default: + throw new IllegalStateException("Unsupported field: " + field); + } + } + + private static class FormatToken { + final int start; + final int end; + + private FormatToken(int start, int end) { + this.start = start; + this.end = end; + } + + public int getLength() { + return end - start; + } + } + + private static class LiteralToken extends FormatToken { + final String token; + + LiteralToken(String token, int start, int end) { + super(start, end); + this.token = token; + } + + @Override + public int getLength() { + return token.length(); + } + + @Override + public String toString() { + return String.format("LiteralToken(token=%s, start=%d, end=%d)", token, start, end); + } + } + + private static class TimeFieldToken extends FormatToken { + final ChronoField field; + + TimeFieldToken(ChronoField field, int start, int end) { + super(start, end); + this.field = field; + } + + @Override + public String toString() { + return String.format("TimeFieldToken(field=%s, start=%d, end=%d)", field, start, end); + } + } + + private static class PatternToken { + final String token; + final boolean isVariable; + + PatternToken(String token, boolean isVariable) { + this.token = token; + this.isVariable = isVariable; + } + + @Override + public String toString() { + return String.format("PatternToken(token='%s', isVariable=%s)", token, isVariable); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java index e6d4e4aadf3c..c1cb699f6d7f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java @@ -22,7 +22,7 @@ import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.serializer.InternalRowSerializer; -import org.apache.paimon.partition.PartitionTimeExtractor; +import org.apache.paimon.partition.PartitionTimeResolver; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.ChainGroupReadTable; @@ -31,16 +31,12 @@ import org.apache.paimon.types.RowType; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAmount; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.function.BiFunction; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; /** Utils for chain table. */ public class ChainTableUtils { @@ -75,61 +71,33 @@ public static List getDeltaPartitions( List partitionColumns, RowType partType, CoreOptions options, - RecordComparator partitionComparator, InternalRowPartitionComputer partitionComputer) { InternalRowSerializer serializer = new InternalRowSerializer(partType); List deltaPartitions = new ArrayList<>(); - boolean isDailyPartition = partitionColumns.size() == 1; List startPartitionValues = new ArrayList<>(partitionComputer.generatePartValues(beginPartition).values()); List endPartitionValues = new ArrayList<>(partitionComputer.generatePartValues(endPartition).values()); - PartitionTimeExtractor timeExtractor = - new PartitionTimeExtractor( - options.partitionTimestampPattern(), options.partitionTimestampFormatter()); - LocalDateTime stratPartitionTime = - timeExtractor.extract(partitionColumns, startPartitionValues); - LocalDateTime candidateTime = stratPartitionTime; - LocalDateTime endPartitionTime = - timeExtractor.extract(partitionColumns, endPartitionValues); + PartitionTimeResolver timeResolver = + new PartitionTimeResolver( + partitionColumns, + options.partitionTimestampPattern(), + options.partitionTimestampFormatter()); + LocalDateTime stratPartitionTime = timeResolver.parsePartitionValues(startPartitionValues); + LocalDateTime endPartitionTime = timeResolver.parsePartitionValues(endPartitionValues); + TemporalAmount step = timeResolver.extractMinStep(); + LocalDateTime candidateTime = stratPartitionTime.plus(step); while (!candidateTime.isAfter(endPartitionTime)) { - if (isDailyPartition) { - if (candidateTime.isAfter(stratPartitionTime)) { - deltaPartitions.add( - serializer - .toBinaryRow( - InternalRowPartitionComputer.convertSpecToInternalRow( - calPartValues( - candidateTime, - partitionColumns, - options.partitionTimestampPattern(), - options.partitionTimestampFormatter()), - partType, - options.partitionDefaultName())) - .copy()); - } - } else { - for (int hour = 0; hour <= 23; hour++) { - candidateTime = candidateTime.toLocalDate().atStartOfDay().plusHours(hour); - BinaryRow candidatePartition = - serializer - .toBinaryRow( - InternalRowPartitionComputer.convertSpecToInternalRow( - calPartValues( - candidateTime, - partitionColumns, - options.partitionTimestampPattern(), - options.partitionTimestampFormatter()), - partType, - options.partitionDefaultName())) - .copy(); - if (partitionComparator.compare(candidatePartition, beginPartition) > 0 - && partitionComparator.compare(candidatePartition, endPartition) <= 0) { - deltaPartitions.add(candidatePartition); - } - } - } - candidateTime = candidateTime.toLocalDate().plusDays(1).atStartOfDay(); + BinaryRow candidatePartition = + serializer + .toBinaryRow( + InternalRowPartitionComputer.convertSpecToInternalRow( + timeResolver.resolvePartitionValues(candidateTime), + partType, + options.partitionDefaultName())) + .copy(); + deltaPartitions.add(candidatePartition); + candidateTime = candidateTime.plus(step); } return deltaPartitions; } @@ -166,48 +134,6 @@ public static Predicate createLinearPredicate( return PredicateBuilder.and(fieldPredicates); } - public static LinkedHashMap calPartValues( - LocalDateTime dateTime, - List partitionKeys, - String timestampPattern, - String timestampFormatter) { - DateTimeFormatter formatter = DateTimeFormatter.ofPattern(timestampFormatter); - String formattedDateTime = dateTime.format(formatter); - Pattern keyPattern = Pattern.compile("\\$(\\w+)"); - Matcher keyMatcher = keyPattern.matcher(timestampPattern); - List keyOrder = new ArrayList<>(); - StringBuilder regexBuilder = new StringBuilder(); - int lastPosition = 0; - while (keyMatcher.find()) { - regexBuilder.append( - Pattern.quote(timestampPattern.substring(lastPosition, keyMatcher.start()))); - regexBuilder.append("(.+)"); - keyOrder.add(keyMatcher.group(1)); - lastPosition = keyMatcher.end(); - } - regexBuilder.append(Pattern.quote(timestampPattern.substring(lastPosition))); - - Matcher valueMatcher = Pattern.compile(regexBuilder.toString()).matcher(formattedDateTime); - if (!valueMatcher.matches() || valueMatcher.groupCount() != keyOrder.size()) { - throw new IllegalArgumentException( - "Formatted datetime does not match timestamp pattern"); - } - - Map keyValues = new HashMap<>(); - for (int i = 0; i < keyOrder.size(); i++) { - keyValues.put(keyOrder.get(i), valueMatcher.group(i + 1)); - } - List values = - partitionKeys.stream() - .map(key -> keyValues.getOrDefault(key, "")) - .collect(Collectors.toList()); - LinkedHashMap res = new LinkedHashMap<>(); - for (int i = 0; i < partitionKeys.size(); i++) { - res.put(partitionKeys.get(i), values.get(i)); - } - return res; - } - public static boolean isScanFallbackDeltaBranch(CoreOptions options) { return options.isChainTable() && options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch()); @@ -312,7 +238,6 @@ public static List getDeltaPartitionsWithProjector( chainPartitionColumns, chainPartType, options, - chainPartitionComparator, chainPartitionComputer); // Combine each chain-only BinaryRow with the group part into a full partition diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java index 971407b92815..f131bbc20d10 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java @@ -34,17 +34,14 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; /** Test class for {@link org.apache.paimon.utils.ChainTableUtils}. */ public class ChainTableUtilsTest { @@ -177,38 +174,6 @@ public void testCreateLinearPredicate() { Assertions.assertTrue(predicate.equals(expected)); } - @Test - public void testGeneratePartitionValues() { - LinkedHashMap partitionValues = - ChainTableUtils.calPartValues( - LocalDateTime.of(2023, 1, 1, 12, 0, 0), - Arrays.asList("dt", "hour"), - "$dt $hour:00:00", - "yyyyMMdd HH:mm:ss"); - assertEquals( - new LinkedHashMap() { - { - put("dt", "20230101"); - put("hour", "12"); - } - }, - partitionValues); - - partitionValues = - ChainTableUtils.calPartValues( - LocalDateTime.of(2023, 1, 1, 0, 0, 0), - Arrays.asList("dt"), - "$dt", - "yyyyMMdd"); - assertEquals( - new LinkedHashMap() { - { - put("dt", "20230101"); - } - }, - partitionValues); - } - // ========================== Tests for findFirstLatestPartitionsWithProjector // ========================== diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionTimeResolverTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionTimeResolverTest.java new file mode 100644 index 000000000000..f05d55be3317 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionTimeResolverTest.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.codegen.RecordComparator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionTimeResolver; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.assertj.core.util.Lists; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.Period; +import java.time.temporal.TemporalAmount; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Tests for {@link PartitionTimeResolver}. */ +public class PartitionTimeResolverTest { + + private TemporalAmount extractMinStep( + String pattern, String formatter, String... partitionColumns) { + return new PartitionTimeResolver(Arrays.asList(partitionColumns), pattern, formatter) + .extractMinStep(); + } + + /** Extract a string value from a BinaryRow at the given position. */ + private static String getString(BinaryRow row, int pos) { + return row.getString(pos).toString(); + } + + private static BinaryRow row(List values) { + BinaryRow row = new BinaryRow(values.size()); + BinaryRowWriter writer = new BinaryRowWriter(row); + for (int i = 0; i < values.size(); i++) { + writer.writeString(i, BinaryString.fromString(values.get(i))); + } + writer.complete(); + return row; + } + + @Test + public void testExtractMinStepWithDuration() { + assertThat(extractMinStep("$y$M$d$H$m$s", "yyyyMMddHHmmss", "y", "M", "d", "H", "m", "s")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("$y$M$d $H$m$s", "yyyyMMdd HHmmss", "y", "M", "d", "H", "m", "s")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat( + extractMinStep( + "$y-$M-$d $H:$m:$s", + "yyyy-MM-dd HH:mm:ss", + "y", + "M", + "d", + "H", + "m", + "s")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat( + extractMinStep( + "$y-$M-$d T $H:$m:$s", + "yyyy-MM-dd 'T' HH:mm:ss", + "y", + "M", + "d", + "H", + "m", + "s")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("$a", "yyyyMMddHHmmss", "a")).isEqualTo(Duration.ofSeconds(1)); + + assertThat(extractMinStep("$a$aaT$aaa$a4Z", "yyMMdd'T'HHmmss'Z'", "a", "aa", "aaa", "a4")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("$a12$aaT$aaa00Z", "yyyyMMdd'T'HHmmss'Z'", "a", "aa", "aaa")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("$aT$a1$a200", "yyyyMMdd'T'HHmmss", "a", "a1", "a2")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("$aT$aa", "yyyyMMdd'T'HHmm", "a", "aa")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("$a", "yyyyMMdd'T'HHmmss", "a")).isEqualTo(Duration.ofSeconds(1)); + + assertThat(extractMinStep("$ab$c $d:$e:$f", "yyyyMMdd HH:mm:ss", "ab", "c", "d", "e", "f")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("$day $a:$b", "yyyyMMdd HH:mm", "day", "a", "b")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("$aa$a", "yyyy/MM/ddHH", "aa", "a")) + .isEqualTo(Duration.ofHours(1)); + + assertThat(extractMinStep("$a $b", "HH:mm:ss yyyyMMdd", "a", "b")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("12:$a $b", "HH:mm:ss yyyyMMdd", "a", "b")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("12:$a:01 $b", "HH:mm:ss yyyyMMdd", "a", "b")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("12:02:01 $b", "HH:mm:ss yyyyMMdd", "b")) + .isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$hour:00:00 $date", "HH:mm:ss yyyyMMdd", "hour", "date")) + .isEqualTo(Duration.ofHours(1)); + assertThat(extractMinStep("00:00:00 $b", "HH:mm:ss yyyyMMdd", "b")) + .isEqualTo(Duration.ofDays(1)); + assertThat( + extractMinStep( + "$hour_minute:01 $date", + "HH:mm:ss yyyyMMdd", + "hour_minute", + "date")) + .isEqualTo(Duration.ofMinutes(1)); + assertThat(extractMinStep("12:$a $b", "HH:mm:ss yyMMdd", "a", "b")) + .isEqualTo(Duration.ofSeconds(1)); + assertThat(extractMinStep("12$b", "HHmmss", "b")).isEqualTo(Duration.ofSeconds(1)); + + assertThat(extractMinStep("$a", "yyyyMMddhh", "a")).isEqualTo(Duration.ofHours(1)); + assertThat(extractMinStep("$date $time", "yyyyMMdd hh", "date", "time")) + .isEqualTo(Duration.ofHours(1)); + assertThat(extractMinStep("$a$b", "yyyyMMddhh", "a", "b")).isEqualTo(Duration.ofHours(1)); + } + + @Test + public void testExtractMinStepWithPeriod() { + assertThat(extractMinStep("$a-01-$b", "yyyy-MM-dd", "a", "b")) + .isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a-01", "yyyy-MM-dd", "a")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$y/$m/$d", "yyyy/MM/dd", "y", "m", "d")) + .isEqualTo(Duration.ofDays(1)); + + assertThat(extractMinStep("$a", "yyyyMMdd", "a")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a01", "yyyyMMdd", "a")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a$aa", "yyyyMMdd", "a", "aa")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("202601$a", "yyyyMMdd", "a")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("2026$a01", "yyyyMMdd", "a")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a1201", "yyyyMMdd", "a")).isEqualTo(Period.ofYears(1)); + assertThat(extractMinStep("$a01", "yyyyMMdd", "a")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a1201", "yyyyMMdd", "a")).isEqualTo(Period.ofYears(1)); + + assertThat(extractMinStep("$a01", "yyMMdd", "a")).isEqualTo(Period.ofMonths(1)); + assertThat(extractMinStep("$a1201", "yyMMdd", "a")).isEqualTo(Period.ofYears(1)); + assertThat(extractMinStep("$a$aa", "yyMMdd", "a", "aa")).isEqualTo(Duration.ofDays(1)); + assertThat(extractMinStep("$a'", "yyMMdd''", "a")).isEqualTo(Duration.ofDays(1)); + } + + @Test + public void testResolvePartitionValues() { + LinkedHashMap partitionValues = + new PartitionTimeResolver( + Arrays.asList("dt", "hour"), "$dt $hour:00:00", "yyyyMMdd HH:mm:ss") + .resolvePartitionValues(LocalDateTime.of(2023, 1, 1, 12, 0, 0)); + assertEquals( + new LinkedHashMap() { + { + put("dt", "20230101"); + put("hour", "12"); + } + }, + partitionValues); + + partitionValues = + new PartitionTimeResolver(Arrays.asList("dt"), "$dt", "yyyyMMdd") + .resolvePartitionValues(LocalDateTime.of(2023, 1, 1, 0, 0, 0)); + assertEquals( + new LinkedHashMap() { + { + put("dt", "20230101"); + } + }, + partitionValues); + } + + @Test + public void testParsePartitionValues() { + PartitionTimeResolver resolver = + new PartitionTimeResolver( + Arrays.asList("a", "aa", "aaa"), + "$a-$aa-$aaa 00:00:00", + "yyyy-MM-dd HH:mm:ss"); + assertThat(resolver.parsePartitionValues(Arrays.asList("2023", "01", "01"))) + .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); + + resolver = + new PartitionTimeResolver( + Arrays.asList("aa", "a", "aaa"), "$aa$a$aaaT000000", "yyyyMMdd'T'HHmmss"); + assertThat(resolver.parsePartitionValues(Arrays.asList("2023", "01", "01"))) + .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); + + resolver = + new PartitionTimeResolver(Arrays.asList("aa", "a", "aaa"), "$aa$a$aaa", "yyyyMMdd"); + assertThat(resolver.parsePartitionValues(Arrays.asList("2023", "01", "01"))) + .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00")); + } + + @Test + public void testParsePartitionValuesWithHourMinuteGranularity() { + // partition keys: (region, dt, hour_minute), chain keys: (dt, hour_minute) + RowType fullType = + RowType.builder() + .field("region", DataTypes.STRING().notNull()) + .field("dt", DataTypes.STRING().notNull()) + .field("hour_minute", DataTypes.STRING().notNull()) + .build(); + + ChainPartitionProjector projector = new ChainPartitionProjector(fullType, 2); + + // Compare chain partition (dt, hour_minute) lexicographically + RecordComparator chainComparator = (a, b) -> a.getString(1).compareTo(b.getString(1)); + + Options opts = new Options(); + opts.set(CoreOptions.PARTITION_TIMESTAMP_PATTERN, "$dtT$hour_minute"); + opts.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd'T'HHmm"); + CoreOptions options = new CoreOptions(opts); + + BinaryRow begin = row(Lists.newArrayList("CN", "20260609", "1010")); + BinaryRow end = row(Lists.newArrayList("CN", "20260609", "1015")); + + List deltas = + ChainTableUtils.getDeltaPartitionsWithProjector( + begin, end, options, chainComparator, projector); + + assertThat(deltas).hasSize(5); + for (BinaryRow delta : deltas) { + assertThat(getString(delta, 0)).isEqualTo("CN"); + assertThat(getString(delta, 1)).isEqualTo("20260609"); + } + assertThat(getString(deltas.get(0), 2)).isEqualTo("1011"); + assertThat(getString(deltas.get(1), 2)).isEqualTo("1012"); + assertThat(getString(deltas.get(2), 2)).isEqualTo("1013"); + assertThat(getString(deltas.get(3), 2)).isEqualTo("1014"); + assertThat(getString(deltas.get(4), 2)).isEqualTo("1015"); + } + + @Test + public void testParsePartitionValuesWithSeparateHourAndMinute() { + // partition keys: (region, dt, hour, minute), chain keys: (dt, hour, minute) + RowType fullType = + RowType.builder() + .field("region", DataTypes.STRING().notNull()) + .field("dt", DataTypes.STRING().notNull()) + .field("hour", DataTypes.STRING().notNull()) + .field("minute", DataTypes.STRING().notNull()) + .build(); + + ChainPartitionProjector projector = new ChainPartitionProjector(fullType, 3); + + // Compare chain partition (dt, hour, minute) lexicographically + RecordComparator chainComparator = (a, b) -> a.getString(2).compareTo(b.getString(2)); + + Options opts = new Options(); + opts.set(CoreOptions.PARTITION_TIMESTAMP_PATTERN, "$dtT$hour$minute00"); + opts.set(CoreOptions.PARTITION_TIMESTAMP_FORMATTER, "yyyyMMdd'T'HHmmss"); + CoreOptions options = new CoreOptions(opts); + + BinaryRow begin = row(Lists.newArrayList("CN", "20260609", "10", "10")); + BinaryRow end = row(Lists.newArrayList("CN", "20260609", "10", "15")); + + List deltas = + ChainTableUtils.getDeltaPartitionsWithProjector( + begin, end, options, chainComparator, projector); + + assertThat(deltas).hasSize(5); + for (BinaryRow delta : deltas) { + assertThat(getString(delta, 0)).isEqualTo("CN"); + assertThat(getString(delta, 1)).isEqualTo("20260609"); + assertThat(getString(delta, 2)).isEqualTo("10"); + } + assertThat(getString(deltas.get(0), 3)).isEqualTo("11"); + assertThat(getString(deltas.get(1), 3)).isEqualTo("12"); + assertThat(getString(deltas.get(2), 3)).isEqualTo("13"); + assertThat(getString(deltas.get(3), 3)).isEqualTo("14"); + assertThat(getString(deltas.get(4), 3)).isEqualTo("15"); + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index af457ac63fe5..26d7735525f5 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -2420,4 +2420,61 @@ public void testChainTableWithMultiChainKeys(@TempDir java.nio.file.Path tempDir spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); spark.close(); } + + @Test + public void testChainTableWithMinuteLevelPartitions(@TempDir java.nio.file.Path tempDir) + throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = createSparkSessionBuilder(warehousePath); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + spark.sql( + "CREATE TABLE `chain_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + ") PARTITIONED BY (`dt` STRING, `hr_min` STRING)\n" + + "TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,hr_min,t1',\n" + + " 'partition.timestamp-pattern' = '$dt $hr_min:00',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd HH:mm:ss',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '1',\n" + + " 'merge-engine' = 'deduplicate',\n" + + " 'sequence.field' = 't2',\n" + + " 'chain-table.chain-partition-keys' = 'dt,hr_min'\n" + + ");"); + + setupChainTableBranches(spark, "chain_test"); + + spark.sql( + "INSERT INTO TABLE `chain_test$branch_snapshot` PARTITION (dt = '20250810', hr_min='01:01') VALUES (3, 1, '3');"); + spark.sql( + "INSERT INTO TABLE `chain_test$branch_snapshot` PARTITION (dt = '20250810', hr_min='03:30') VALUES (4, 1, '4');"); + + spark.sql( + "INSERT INTO TABLE `chain_test$branch_delta` PARTITION (dt = '20250810', hr_min='03:35') VALUES (5, 1, '5');"); + spark.sql( + "INSERT INTO TABLE `chain_test$branch_delta` PARTITION (dt = '20250810', hr_min='03:40') VALUES (6, 1, '6');"); + spark.sql( + "INSERT INTO TABLE `chain_test$branch_delta` PARTITION (dt = '20250810', hr_min='03:45') VALUES (7, 1, '7');"); + + assertThat( + spark + .sql( + "select * from `chain_test` where dt='20250810' and hr_min='03:40'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder( + "[4,1,4,20250810,03:40]", + "[5,1,5,20250810,03:40]", + "[6,1,6,20250810,03:40]"); + + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;"); + spark.close(); + } }