diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index 0301d3d47d..bd2a0e0b98 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -829,6 +829,15 @@ public class ConfigOptions { + "we would fsync after every message; if it were 5 we would fsync after every " + "five messages."); + public static final ConfigOption LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL = + key("log.flush.offset.checkpoint-interval") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription( + "The frequency with which we update the persistent record of the last " + + "flush which acts as the log recovery point. The default " + + "setting is 60 seconds."); + public static final ConfigOption LOG_REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL = key("log.replica.high-watermark.checkpoint-interval") .durationType() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index 3c804596af..db08aaaa28 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -63,6 +63,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -104,6 +105,7 @@ public final class LogManager extends TabletManagerBase { private final Map currentLogs = new ConcurrentHashMap<>(); private volatile Map recoveryPointCheckpoints; + private volatile ScheduledFuture recoveryPointCheckpointTask; private boolean loadLogsCompletedFlag = false; private LogManager( @@ -146,7 +148,17 @@ public static LogManager create( public void startup() { loadAllLogs(); - // TODO add more scheduler, like log-flusher etc. + long checkpointIntervalMs = + conf.get(ConfigOptions.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL).toMillis(); + LOG.info( + "Starting log recovery point checkpoint with a period of {} ms.", + checkpointIntervalMs); + recoveryPointCheckpointTask = + scheduler.schedule( + "fluss-recovery-point-checkpoint", + this::checkpointRecoveryOffsets, + checkpointIntervalMs, + checkpointIntervalMs); } private void initializeCheckpointMaps() throws IOException { @@ -447,6 +459,10 @@ private LogTablet loadLog( /** Close all the logs. */ public void shutdown() { LOG.info("Shutting down LogManager."); + if (recoveryPointCheckpointTask != null) { + recoveryPointCheckpointTask.cancel(false); + recoveryPointCheckpointTask = null; + } Map> logsByDataDir = new LinkedHashMap<>(); for (File dataDir : dataDirs) { @@ -616,6 +632,13 @@ void checkpointRecoveryOffsets(File dataDir) { .collect(Collectors.toList())); } + @VisibleForTesting + void checkpointRecoveryOffsets() { + for (File dataDir : dataDirs) { + checkpointRecoveryOffsets(dataDir); + } + } + private void checkpointRecoveryOffsets(File dataDir, List logs) { try { Map recoveryOffsets = new HashMap<>(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java index a6087e39b3..8706edfde5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java @@ -33,8 +33,10 @@ import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.testutils.common.AllCallbackWrapper; +import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.clock.SystemClock; import org.apache.fluss.utils.concurrent.FlussScheduler; +import org.apache.fluss.utils.concurrent.Scheduler; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -50,10 +52,18 @@ import javax.annotation.Nullable; import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Delayed; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static org.apache.fluss.record.TestData.ANOTHER_DATA1; import static org.apache.fluss.record.TestData.DATA1; @@ -68,6 +78,7 @@ import static org.apache.fluss.server.log.LogManager.CLEAN_SHUTDOWN_FILE; import static org.apache.fluss.testutils.DataTestUtils.assertLogRecordsEquals; import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; +import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link LogManager}. */ @@ -353,6 +364,139 @@ void testCheckpointRecoveryPointsAreWrittenPerDirectory() throws Exception { assertThat(dir2Checkpoints.get(tableBucket2)).isEqualTo(log2.getRecoveryPoint()); } + @Test + void testPeriodicRecoveryPointCheckpoint() throws Exception { + logManager.shutdown(); + logManager = null; + localDiskManager.close(); + localDiskManager = LocalDiskManager.create(conf); + + RecordingScheduler scheduler = new RecordingScheduler(); + logManager = + LogManager.create( + conf, + zkClient, + scheduler, + SystemClock.getInstance(), + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); + logManager.startup(); + + RecordingScheduledTask checkpointTask = + scheduler.getTask("fluss-recovery-point-checkpoint"); + assertThat((Object) checkpointTask).isNotNull(); + assertThat(checkpointTask.getDelayMs()).isEqualTo(60_000L); + assertThat(checkpointTask.getPeriodMs()).isEqualTo(60_000L); + + initTableBuckets(null); + LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1); + log1.appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + log1.flush(false); + + checkpointTask.run(); + + Map checkpoints = + new OffsetCheckpointFile( + new File(tempDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE)) + .read(); + assertThat(checkpoints).containsEntry(tableBucket1, log1.getRecoveryPoint()); + } + + @Test + void testLogManagerPeriodicallyCheckpointsRecoveryPoints() throws Exception { + logManager.shutdown(); + logManager = null; + localDiskManager.close(); + + conf.set(ConfigOptions.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL, Duration.ofMillis(10)); + localDiskManager = LocalDiskManager.create(conf); + + FlussScheduler scheduler = new FlussScheduler(1); + scheduler.startup(); + try { + logManager = + LogManager.create( + conf, + zkClient, + scheduler, + SystemClock.getInstance(), + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); + logManager.startup(); + + initTableBuckets(null); + LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1); + log1.appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + log1.flush(false); + + waitUntil( + () -> { + Map checkpoints = + new OffsetCheckpointFile( + new File( + tempDir, + LogManager.RECOVERY_POINT_CHECKPOINT_FILE)) + .read(); + return checkpoints.containsKey(tableBucket1) + && checkpoints.get(tableBucket1).equals(log1.getRecoveryPoint()); + }, + Duration.ofSeconds(2), + "Timed out waiting for periodic recovery point checkpoint."); + } finally { + scheduler.shutdown(); + } + } + + @Test + void testRecoveryPointCheckpointSkipsFlushedSegmentsDuringRecovery() throws Exception { + initTableBuckets(null); + LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1); + log1.appendAsLeader(genMemoryLogRecordsByObject(DATA1)); + + File firstSegmentFile = FlussPaths.logFile(log1.getLogDir(), 0L); + assertThat(firstSegmentFile).exists(); + long cleanSegmentSize = firstSegmentFile.length(); + + log1.roll(Optional.empty()); + long rolledSegmentBaseOffset = log1.activeLogSegment().getBaseOffset(); + log1.appendAsLeader(genMemoryLogRecordsByObject(ANOTHER_DATA1)); + log1.flush(false); + long recoveryPoint = log1.getRecoveryPoint(); + assertThat(recoveryPoint).isGreaterThan(rolledSegmentBaseOffset); + + logManager.checkpointRecoveryOffsets(tempDir); + Map checkpoints = + new OffsetCheckpointFile( + new File(tempDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE)) + .read(); + assertThat(checkpoints).containsEntry(tableBucket1, recoveryPoint); + + logManager.shutdown(); + logManager = null; + Files.deleteIfExists(new File(tempDir, CLEAN_SHUTDOWN_FILE).toPath()); + + assertThat(firstSegmentFile.length()).isEqualTo(cleanSegmentSize); + appendInvalidBytes(firstSegmentFile); + long corruptSegmentSize = firstSegmentFile.length(); + assertThat(corruptSegmentSize).isGreaterThan(cleanSegmentSize); + + localDiskManager.close(); + localDiskManager = LocalDiskManager.create(conf); + + logManager = + LogManager.create( + conf, + zkClient, + new FlussScheduler(1), + SystemClock.getInstance(), + TestingMetricGroups.TABLET_SERVER_METRICS, + localDiskManager); + logManager.startup(); + + assertThat(logManager.getLog(tableBucket1)).isPresent(); + assertThat(firstSegmentFile.length()).isEqualTo(corruptSegmentSize); + } + @Test @Tag(ServerTestTags.JBOD_MULTI_DIR_TAG) void testPerDirectoryCleanShutdownAndRecovery() throws Exception { @@ -425,6 +569,96 @@ private FetchDataInfo readLog(LogTablet log) throws Exception { return log.read(0, Integer.MAX_VALUE, FetchIsolation.LOG_END, true, null, null); } + private static void appendInvalidBytes(File file) throws IOException { + try (FileOutputStream outputStream = new FileOutputStream(file, true)) { + outputStream.write(new byte[] {0, 1, 2, 3}); + } + } + + private static final class RecordingScheduler implements Scheduler { + private final Map tasks = new HashMap<>(); + + @Override + public void startup() {} + + @Override + public void shutdown() {} + + @Override + public ScheduledFuture schedule( + String name, Runnable task, long delayMs, long periodMs) { + RecordingScheduledTask scheduledTask = + new RecordingScheduledTask(task, delayMs, periodMs); + tasks.put(name, scheduledTask); + return scheduledTask; + } + + private RecordingScheduledTask getTask(String name) { + return tasks.get(name); + } + } + + private static final class RecordingScheduledTask implements ScheduledFuture { + private final Runnable task; + private final long delayMs; + private final long periodMs; + private boolean cancelled; + + private RecordingScheduledTask(Runnable task, long delayMs, long periodMs) { + this.task = task; + this.delayMs = delayMs; + this.periodMs = periodMs; + } + + private long getDelayMs() { + return delayMs; + } + + private long getPeriodMs() { + return periodMs; + } + + private void run() { + task.run(); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delayMs, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return cancelled; + } + + @Override + public Void get() { + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) { + return null; + } + } + @AfterEach public void tearDown() throws Exception { if (logManager != null) { diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 0a4223074d..0d940acb81 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -118,6 +118,7 @@ during the Fluss cluster working. | log.index.interval-size | MemorySize | 4k | This setting controls how frequently Fluss adds an index entry to its offset index. The default setting ensures that we index a message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact position in the log but makes the index larger. You probably don't need to change this. | | log.file-preallocate | Boolean | false | True if we should preallocate the file on disk when creating a new log segment. | | log.flush.interval-messages | Long | Long.MAX_VALUE | This setting allows specifying an interval at which we will force a fsync of data written to the log. For example if this was set to 1, we would fsync after every message; if it were 5 we would fsync after every five messages. | +| log.flush.offset.checkpoint-interval | Duration | 1min | The frequency with which we update the persistent record of the last flush which acts as the log recovery point. The default setting is 60 seconds. | | log.replica.high-watermark.checkpoint-interval | Duration | 5s | The frequency with which the high watermark is saved out to disk. The default setting is 5 seconds. | | log.replica.max-lag-time | Duration | 30s | If a follower replica hasn't sent any fetch log requests or hasn't consumed up the leaders log end offset for at least this time, the leader will remove the follower replica from isr | | log.replica.write-operation-purge-number | Integer | 1000 | The purge number (in number of requests) of the write operation manager, the default value is 1000. |