Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
// Upsert compact merge task metrics
UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE("UpsertCompactMergeSegmentsSkipped", false),
// Number of segment-delete requests rejected because the targets participate in a live segment lineage entry.
LINEAGE_BLOCKED_DELETE_COUNT("LineageBlockedDeleteCount", false);
LINEAGE_BLOCKED_DELETE_COUNT("LineageBlockedDeleteCount", false),
// Segment uploads rejected for being outside the table retention window.
OUT_OF_RETENTION_SEGMENT_UPLOAD_REJECTED("OutOfRetentionSegmentUploadRejected", true);

private final String _brokerMeterName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,88 @@
*/
package org.apache.pinot.common.utils;

import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Utility methods for evaluating segment retention eligibility.
*/
/// Utility methods for evaluating segment retention eligibility and parsing table data retention from config.
public class RetentionUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(RetentionUtils.class);

private RetentionUtils() {
}

/**
* Implements the time-comparison and creation-time fallback logic used by {@code TimeRetentionStrategy} for
* completed segments. Does NOT check segment completion status — callers must guarantee that only completed
* segments (DONE or UPLOADED) are passed in.
* <ul>
* <li>If end time is valid: expired when {@code currentTimeMs - endTimeMs > retentionMs}.</li>
* <li>If end time is invalid and {@code useCreationTimeFallback} is true and creation time is valid:
* expired when {@code currentTimeMs - creationTimeMs > retentionMs}.</li>
* <li>Otherwise: not expired (fail-open).</li>
* </ul>
*
* @param tableNameWithType table name with type suffix, used for logging
* @param segmentZKMetadata segment metadata
* @param retentionMs retention period in milliseconds (must be positive)
* @param currentTimeMs current wall-clock time in milliseconds
* @param useCreationTimeFallback when true, fall back to creation time if end time is invalid
* (must match {@code controller.retentionManager.enableCreationTimeFallback})
* @return true if the segment is past the retention boundary, false otherwise
*/
/// Whether time-based data retention should run for this table's segments, matching the controller retention manager:
/// for {@link TableType#OFFLINE} tables, retention applies only when batch segment ingestion type is {@code APPEND}.
public static boolean shouldManageTimeBasedDataRetention(TableConfig tableConfig) {
if (tableConfig.getTableType() == TableType.OFFLINE && !"APPEND"
.equalsIgnoreCase(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig))) {
return false;
}
return true;
}

/// Implements the time-comparison and creation-time fallback logic used by `TimeRetentionStrategy` for completed
/// segments. Does **not** check segment completion status — callers must guarantee that only completed segments (DONE
/// or UPLOADED) are passed in.
///
/// - If end time is valid: expired when `currentTimeMs - endTimeMs > retentionMs`.
/// - If end time is invalid and `useCreationTimeFallback` is true and creation time is valid: expired when
/// `currentTimeMs - creationTimeMs > retentionMs`.
/// - Otherwise: not expired (fail-open).
///
/// @param tableNameWithType table name with type suffix, used for logging
/// @param segmentZKMetadata segment metadata
/// @param retentionMs retention period in milliseconds (must be positive)
/// @param currentTimeMs current wall-clock time in milliseconds
/// @param useCreationTimeFallback when true, fall back to creation time if end time is invalid (must match
/// `controller.retentionManager.enableCreationTimeFallback`)
/// @return `true` if the segment is past the retention boundary, `false` otherwise
public static boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata, long retentionMs,
long currentTimeMs, boolean useCreationTimeFallback) {
String segmentName = segmentZKMetadata.getSegmentName();
long endTimeMs = segmentZKMetadata.getEndTimeMs();
return isPurgeableInternal(segmentZKMetadata.getEndTimeMs(), segmentZKMetadata.getCreationTime(), retentionMs,
currentTimeMs, useCreationTimeFallback, tableNameWithType, segmentZKMetadata.getSegmentName());
}

/// Whether segment file metadata is past the retention window, using end time from
/// {@link #getSegmentMetadataEndTimeMillis(SegmentMetadata)} and
/// {@link SegmentMetadata#getIndexCreationTime()} as the optional fallback clock. Invalid-end paths emit the same
/// style of debug/warn lines as the ZK overload, using
/// {@link SegmentMetadata#getName()} for the segment in log messages.
///
/// - If end time is valid: purgeable when `currentTimeMs - endTimeMs > retentionMs`.
/// - If end time is invalid and `useCreationTimeFallback` is true and index creation time is valid: purgeable when
/// `currentTimeMs - creationTimeMs > retentionMs`.
/// - Otherwise: not purgeable (fail-open).
public static boolean isPurgeable(String tableNameWithType, SegmentMetadata segmentMetadata, long retentionMs,
long currentTimeMs, boolean useCreationTimeFallback) {
return isPurgeableInternal(getSegmentMetadataEndTimeMillis(segmentMetadata), segmentMetadata.getIndexCreationTime(),
retentionMs, currentTimeMs, useCreationTimeFallback, tableNameWithType, segmentMetadata.getName());
}

private static boolean isPurgeableInternal(long endTimeMs, long creationTimeMs, long retentionMs,
long currentTimeMs, boolean useCreationTimeFallback, String tableNameWithType,
String segmentName) {
if (TimeUtils.timeValueInValidRange(endTimeMs)) {
return currentTimeMs - endTimeMs > retentionMs;
}
if (useCreationTimeFallback && TimeUtils.timeValueInValidRange(creationTimeMs)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The creationTime fallback may be of no value for offline tables. It means that if a valid endTime was not computed for data then we fall back to the time when the segment was created. In many cases the createTimeMs will be closer to now(). so currentTimeMs - creationTimeMs > retentionMs; will always be false in fallback.

In case of upsertCompaction like minion tasks, the creationTime is preserved from the original segments so thay may be very old. The segments would get rejected here. It looks like a similar handling is done previously on minion side to avoid uploading segments near retention boundary: #18285

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tarun11Mavani, can you also take a look from upsertCompaction perspective.

Copy link
Copy Markdown
Author

@shuturmurgh shuturmurgh May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For single-segment upload we now call RetentionUtils.isPurgeable(..., useCreationTimeFallback=false), so we only use the segment’s data end time from file metadata.
Invalid/missing end time remains fail-open (no reject).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.

LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
segmentName, tableNameWithType, endTimeMs, creationTimeMs);
return currentTimeMs - creationTimeMs > retentionMs;
}
if (useCreationTimeFallback) {
long creationTimeMs = segmentZKMetadata.getCreationTime();
if (TimeUtils.timeValueInValidRange(creationTimeMs)) {
LOGGER.debug("Segment: {} of table: {} has invalid end time: {}. Using creation time: {} as fallback",
segmentName, tableNameWithType, endTimeMs, creationTimeMs);
return currentTimeMs - creationTimeMs > retentionMs;
}
LOGGER.warn("Segment: {} of table: {} has invalid end time: {} and invalid creation time: {}. "
+ "Cannot determine retention, skipping", segmentName, tableNameWithType, endTimeMs, creationTimeMs);
} else {
Expand All @@ -74,4 +108,43 @@ public static boolean isPurgeable(String tableNameWithType, SegmentZKMetadata se
}
return false;
}

/// Parses {@link SegmentsValidationAndRetentionConfig#getRetentionTimeUnit()} and
/// {@link SegmentsValidationAndRetentionConfig#getRetentionTimeValue()} into table data retention duration in
/// milliseconds (same interpretation as controller time-based retention).
///
/// @return millis, or empty if unset or not parseable
public static OptionalLong parseTableDataRetentionMillis(
@Nullable SegmentsValidationAndRetentionConfig validationConfig) {
if (validationConfig == null) {
return OptionalLong.empty();
}
String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
String retentionTimeValue = validationConfig.getRetentionTimeValue();
if (StringUtils.isEmpty(retentionTimeUnit) || StringUtils.isEmpty(retentionTimeValue)) {
return OptionalLong.empty();
}
try {
return OptionalLong.of(TimeUnit.valueOf(retentionTimeUnit.toUpperCase())
.toMillis(Long.parseLong(retentionTimeValue)));
} catch (Exception e) {
Comment thread
shuturmurgh marked this conversation as resolved.
LOGGER.warn("Failed to parse table data retention: unit='{}' value='{}'", retentionTimeUnit, retentionTimeValue,
e);
return OptionalLong.empty();
}
}

/// Returns segment end time in epoch millis from on-disk {@link SegmentMetadata} for retention comparison, using
/// {@link SegmentMetadata#getTimeUnit()} and {@link SegmentMetadata#getEndTime()} (same raw fields minion task
/// executors pass when configuring generation from segment files). When `timeUnit` is non-null and raw end time is
/// not {@link Long#MIN_VALUE}, returns `timeUnit.toMillis(endTime)`; otherwise returns the raw end time (typically
/// `Long.MIN_VALUE` when unset).
public static long getSegmentMetadataEndTimeMillis(SegmentMetadata segmentMetadata) {
TimeUnit timeUnit = segmentMetadata.getTimeUnit();
Comment thread
shuturmurgh marked this conversation as resolved.
long endTime = segmentMetadata.getEndTime();
if (timeUnit != null && endTime != Long.MIN_VALUE) {
return timeUnit.toMillis(endTime);
}
return endTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@
*/
package org.apache.pinot.common.utils;

import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

Expand All @@ -47,6 +60,77 @@ private static SegmentZKMetadata makeSegmentWithCreationTime(long endTimeMs, lon
return segment;
}

private static SegmentMetadata mockSegmentMetadataMillis(long endTimeRaw, long indexCreationTimeMs) {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getName()).thenReturn("seg");
Mockito.when(metadata.getTimeInterval()).thenReturn(null);
Mockito.when(metadata.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
Mockito.when(metadata.getEndTime()).thenReturn(endTimeRaw);
Mockito.when(metadata.getIndexCreationTime()).thenReturn(indexCreationTimeMs);
return metadata;
}

@Test
public void getSegmentMetadataEndTimeMillisFromTimeUnitAndEndTime() {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
Mockito.when(metadata.getEndTime()).thenReturn(5000L);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), 5000L);
}

@Test
public void getSegmentMetadataEndTimeMillisConvertsWithTimeUnit() {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
Mockito.when(metadata.getEndTime()).thenReturn(2L);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), TimeUnit.DAYS.toMillis(2));
}

@Test
public void getSegmentMetadataEndTimeMillisUnsetReturnsRawEndTime() {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeUnit()).thenReturn(null);
Mockito.when(metadata.getEndTime()).thenReturn(Long.MIN_VALUE);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), Long.MIN_VALUE);
}

@Test
public void getSegmentMetadataEndTimeMillisNullTimeUnitReturnsRawEndTime() {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeUnit()).thenReturn(null);
Mockito.when(metadata.getEndTime()).thenReturn(12_345L);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), 12_345L);
}

@Test
public void getSegmentMetadataEndTimeMillisMinRawEndTimeReturnsMinWithoutConversion() {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
Mockito.when(metadata.getEndTime()).thenReturn(Long.MIN_VALUE);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), Long.MIN_VALUE);
}

@Test
public void getSegmentMetadataEndTimeMillisDoesNotReadTimeInterval() {
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeUnit()).thenReturn(TimeUnit.DAYS);
Mockito.when(metadata.getEndTime()).thenReturn(1L);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), TimeUnit.DAYS.toMillis(1));
verify(metadata, never()).getTimeInterval();
}

@Test
public void getSegmentMetadataEndTimeMillisIgnoresTimeIntervalWhenRawFieldsDiffer() {
Interval interval = new Interval(1000L, 9000L, DateTimeZone.UTC);
SegmentMetadata metadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(metadata.getTimeInterval()).thenReturn(interval);
Mockito.when(metadata.getTimeUnit()).thenReturn(TimeUnit.MILLISECONDS);
Mockito.when(metadata.getEndTime()).thenReturn(5000L);
clearInvocations(metadata);
Assert.assertEquals(RetentionUtils.getSegmentMetadataEndTimeMillis(metadata), 5000L);
verify(metadata, never()).getTimeInterval();
}

@Test
public void testExpiredSegmentIsPurgeable() {
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -127,4 +211,112 @@ public void testValidEndTimeTakesPriorityOverCreationTimeFallback() {
SegmentZKMetadata segment = makeSegmentWithCreationTime(now - 2 * ONE_DAY_MS, now - 10 * ONE_DAY_MS);
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, segment, RETENTION_MS, now, true));
}

@Test
public void testSegmentMetadataPurgeableExactBoundaryNotOutside() {
long now = System.currentTimeMillis();
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, mockSegmentMetadataMillis(now - RETENTION_MS, 0L),
RETENTION_MS, now, false));
}

@Test
public void testSegmentMetadataInvalidEndNoFallback() {
long now = System.currentTimeMillis();
assertFalse(RetentionUtils.isPurgeable(TABLE_NAME, mockSegmentMetadataMillis(-1, now - 10 * ONE_DAY_MS),
RETENTION_MS, now, false));
}

@Test
public void testSegmentMetadataInvalidEndWithFallback() {
long now = System.currentTimeMillis();
long creation = now - 10 * ONE_DAY_MS;
assertTrue(RetentionUtils.isPurgeable(TABLE_NAME, mockSegmentMetadataMillis(-1, creation),
RETENTION_MS, now, true));
}

@Test
public void shouldManageTimeBasedDataRetentionRealtime() {
TableConfig cfg = new TableConfigBuilder(TableType.REALTIME).setTableName("t").build();
assertTrue(RetentionUtils.shouldManageTimeBasedDataRetention(cfg));
}

@Test
public void shouldManageTimeBasedDataRetentionOfflineAppendDefault() {
TableConfig cfg = new TableConfigBuilder(TableType.OFFLINE).setTableName("t_OFFLINE").build();
assertTrue(RetentionUtils.shouldManageTimeBasedDataRetention(cfg));
}

@Test
public void shouldManageTimeBasedDataRetentionOfflineRefreshSkipped() {
TableConfig cfg = new TableConfigBuilder(TableType.OFFLINE).setTableName("t_OFFLINE").setSegmentPushType("REFRESH")
.build();
assertFalse(RetentionUtils.shouldManageTimeBasedDataRetention(cfg));
}

@Test
public void testParseTableDataRetentionMillisValidDays() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("DAYS");
cfg.setRetentionTimeValue("7");
Assert.assertTrue(RetentionUtils.parseTableDataRetentionMillis(cfg).isPresent());
Assert.assertEquals(RetentionUtils.parseTableDataRetentionMillis(cfg).getAsLong(), TimeUnit.DAYS.toMillis(7));
}

@Test
public void testParseTableDataRetentionMillisMissingReturnsEmpty() {
assertFalse(RetentionUtils.parseTableDataRetentionMillis(new SegmentsValidationAndRetentionConfig()).isPresent());
assertFalse(RetentionUtils.parseTableDataRetentionMillis(null).isPresent());
}

@Test
public void testParseTableDataRetentionMillisInvalidUnitReturnsEmpty() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("FORTNIGHTS");
cfg.setRetentionTimeValue("1");
assertFalse(RetentionUtils.parseTableDataRetentionMillis(cfg).isPresent());
}

@Test
public void testParseTableDataRetentionMillisEmptyUnitReturnsEmpty() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("");
cfg.setRetentionTimeValue("7");
assertFalse(RetentionUtils.parseTableDataRetentionMillis(cfg).isPresent());
}

@Test
public void testParseTableDataRetentionMillisEmptyValueReturnsEmpty() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("DAYS");
cfg.setRetentionTimeValue("");
assertFalse(RetentionUtils.parseTableDataRetentionMillis(cfg).isPresent());
}

@Test
public void testParseTableDataRetentionMillisInvalidValueReturnsEmpty() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("DAYS");
cfg.setRetentionTimeValue("not-a-number");
assertFalse(RetentionUtils.parseTableDataRetentionMillis(cfg).isPresent());
}

@Test
public void testParseTableDataRetentionMillisLowerCaseUnitParses() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("days");
cfg.setRetentionTimeValue("7");
OptionalLong parsed = RetentionUtils.parseTableDataRetentionMillis(cfg);
Assert.assertTrue(parsed.isPresent());
Assert.assertEquals(parsed.getAsLong(), TimeUnit.DAYS.toMillis(7));
}

@Test
public void testParseTableDataRetentionMillisValidHours() {
SegmentsValidationAndRetentionConfig cfg = new SegmentsValidationAndRetentionConfig();
cfg.setRetentionTimeUnit("HOURS");
cfg.setRetentionTimeValue("12");
OptionalLong parsed = RetentionUtils.parseTableDataRetentionMillis(cfg);
Assert.assertTrue(parsed.isPresent());
Assert.assertEquals(parsed.getAsLong(), TimeUnit.HOURS.toMillis(12));
}
}
Loading