Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,15 @@ public class ConfigOptions {
+ "we would fsync after every message; if it were 5 we would fsync after every "
+ "five messages.");

public static final ConfigOption<Duration> LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL =
key("log.flush.offset.checkpoint-interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"The frequency with which we update the persistent record of the last "
+ "flush which acts as the log recovery point. The default "
+ "setting is 60 seconds.");

public static final ConfigOption<Duration> LOG_REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL =
key("log.replica.high-watermark.checkpoint-interval")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -104,6 +105,7 @@ public final class LogManager extends TabletManagerBase {
private final Map<TableBucket, LogTablet> currentLogs = new ConcurrentHashMap<>();

private volatile Map<File, OffsetCheckpointFile> recoveryPointCheckpoints;
private volatile ScheduledFuture<?> recoveryPointCheckpointTask;
private boolean loadLogsCompletedFlag = false;

private LogManager(
Expand Down Expand Up @@ -146,7 +148,17 @@ public static LogManager create(
public void startup() {
loadAllLogs();

// TODO add more scheduler, like log-flusher etc.
long checkpointIntervalMs =
conf.get(ConfigOptions.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL).toMillis();
LOG.info(
"Starting log recovery point checkpoint with a period of {} ms.",
checkpointIntervalMs);
recoveryPointCheckpointTask =
scheduler.schedule(
"fluss-recovery-point-checkpoint",
this::checkpointRecoveryOffsets,
checkpointIntervalMs,
checkpointIntervalMs);
}

private void initializeCheckpointMaps() throws IOException {
Expand Down Expand Up @@ -447,6 +459,10 @@ private LogTablet loadLog(
/** Close all the logs. */
public void shutdown() {
LOG.info("Shutting down LogManager.");
if (recoveryPointCheckpointTask != null) {
recoveryPointCheckpointTask.cancel(false);
recoveryPointCheckpointTask = null;
}

Map<File, List<LogTablet>> logsByDataDir = new LinkedHashMap<>();
for (File dataDir : dataDirs) {
Expand Down Expand Up @@ -616,6 +632,13 @@ void checkpointRecoveryOffsets(File dataDir) {
.collect(Collectors.toList()));
}

@VisibleForTesting
void checkpointRecoveryOffsets() {
for (File dataDir : dataDirs) {
checkpointRecoveryOffsets(dataDir);
}
}

private void checkpointRecoveryOffsets(File dataDir, List<LogTablet> logs) {
try {
Map<TableBucket, Long> recoveryOffsets = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.apache.fluss.server.zk.ZooKeeperExtension;
import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.clock.SystemClock;
import org.apache.fluss.utils.concurrent.FlussScheduler;
import org.apache.fluss.utils.concurrent.Scheduler;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -50,10 +52,18 @@
import javax.annotation.Nullable;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.fluss.record.TestData.ANOTHER_DATA1;
import static org.apache.fluss.record.TestData.DATA1;
Expand All @@ -68,6 +78,7 @@
import static org.apache.fluss.server.log.LogManager.CLEAN_SHUTDOWN_FILE;
import static org.apache.fluss.testutils.DataTestUtils.assertLogRecordsEquals;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link LogManager}. */
Expand Down Expand Up @@ -353,6 +364,139 @@ void testCheckpointRecoveryPointsAreWrittenPerDirectory() throws Exception {
assertThat(dir2Checkpoints.get(tableBucket2)).isEqualTo(log2.getRecoveryPoint());
}

@Test
void testPeriodicRecoveryPointCheckpoint() throws Exception {
logManager.shutdown();
logManager = null;
localDiskManager.close();
localDiskManager = LocalDiskManager.create(conf);

RecordingScheduler scheduler = new RecordingScheduler();
logManager =
LogManager.create(
conf,
zkClient,
scheduler,
SystemClock.getInstance(),
TestingMetricGroups.TABLET_SERVER_METRICS,
localDiskManager);
logManager.startup();

RecordingScheduledTask checkpointTask =
scheduler.getTask("fluss-recovery-point-checkpoint");
assertThat((Object) checkpointTask).isNotNull();
assertThat(checkpointTask.getDelayMs()).isEqualTo(60_000L);
assertThat(checkpointTask.getPeriodMs()).isEqualTo(60_000L);

initTableBuckets(null);
LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1);
log1.appendAsLeader(genMemoryLogRecordsByObject(DATA1));
log1.flush(false);

checkpointTask.run();

Map<TableBucket, Long> checkpoints =
new OffsetCheckpointFile(
new File(tempDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE))
.read();
assertThat(checkpoints).containsEntry(tableBucket1, log1.getRecoveryPoint());
}

@Test
void testLogManagerPeriodicallyCheckpointsRecoveryPoints() throws Exception {
logManager.shutdown();
logManager = null;
localDiskManager.close();

conf.set(ConfigOptions.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL, Duration.ofMillis(10));
localDiskManager = LocalDiskManager.create(conf);

FlussScheduler scheduler = new FlussScheduler(1);
scheduler.startup();
try {
logManager =
LogManager.create(
conf,
zkClient,
scheduler,
SystemClock.getInstance(),
TestingMetricGroups.TABLET_SERVER_METRICS,
localDiskManager);
logManager.startup();

initTableBuckets(null);
LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1);
log1.appendAsLeader(genMemoryLogRecordsByObject(DATA1));
log1.flush(false);

waitUntil(
() -> {
Map<TableBucket, Long> checkpoints =
new OffsetCheckpointFile(
new File(
tempDir,
LogManager.RECOVERY_POINT_CHECKPOINT_FILE))
.read();
return checkpoints.containsKey(tableBucket1)
&& checkpoints.get(tableBucket1).equals(log1.getRecoveryPoint());
},
Duration.ofSeconds(2),
"Timed out waiting for periodic recovery point checkpoint.");
} finally {
scheduler.shutdown();
}
}

@Test
void testRecoveryPointCheckpointSkipsFlushedSegmentsDuringRecovery() throws Exception {
initTableBuckets(null);
LogTablet log1 = getOrCreateLog(tablePath1, null, tableBucket1);
log1.appendAsLeader(genMemoryLogRecordsByObject(DATA1));

File firstSegmentFile = FlussPaths.logFile(log1.getLogDir(), 0L);
assertThat(firstSegmentFile).exists();
long cleanSegmentSize = firstSegmentFile.length();

log1.roll(Optional.empty());
long rolledSegmentBaseOffset = log1.activeLogSegment().getBaseOffset();
log1.appendAsLeader(genMemoryLogRecordsByObject(ANOTHER_DATA1));
log1.flush(false);
long recoveryPoint = log1.getRecoveryPoint();
assertThat(recoveryPoint).isGreaterThan(rolledSegmentBaseOffset);

logManager.checkpointRecoveryOffsets(tempDir);
Map<TableBucket, Long> checkpoints =
new OffsetCheckpointFile(
new File(tempDir, LogManager.RECOVERY_POINT_CHECKPOINT_FILE))
.read();
assertThat(checkpoints).containsEntry(tableBucket1, recoveryPoint);

logManager.shutdown();
logManager = null;
Files.deleteIfExists(new File(tempDir, CLEAN_SHUTDOWN_FILE).toPath());

assertThat(firstSegmentFile.length()).isEqualTo(cleanSegmentSize);
appendInvalidBytes(firstSegmentFile);
long corruptSegmentSize = firstSegmentFile.length();
assertThat(corruptSegmentSize).isGreaterThan(cleanSegmentSize);

localDiskManager.close();
localDiskManager = LocalDiskManager.create(conf);

logManager =
LogManager.create(
conf,
zkClient,
new FlussScheduler(1),
SystemClock.getInstance(),
TestingMetricGroups.TABLET_SERVER_METRICS,
localDiskManager);
logManager.startup();

assertThat(logManager.getLog(tableBucket1)).isPresent();
assertThat(firstSegmentFile.length()).isEqualTo(corruptSegmentSize);
}

@Test
@Tag(ServerTestTags.JBOD_MULTI_DIR_TAG)
void testPerDirectoryCleanShutdownAndRecovery() throws Exception {
Expand Down Expand Up @@ -425,6 +569,96 @@ private FetchDataInfo readLog(LogTablet log) throws Exception {
return log.read(0, Integer.MAX_VALUE, FetchIsolation.LOG_END, true, null, null);
}

private static void appendInvalidBytes(File file) throws IOException {
try (FileOutputStream outputStream = new FileOutputStream(file, true)) {
outputStream.write(new byte[] {0, 1, 2, 3});
}
}

private static final class RecordingScheduler implements Scheduler {
private final Map<String, RecordingScheduledTask> tasks = new HashMap<>();

@Override
public void startup() {}

@Override
public void shutdown() {}

@Override
public ScheduledFuture<?> schedule(
String name, Runnable task, long delayMs, long periodMs) {
RecordingScheduledTask scheduledTask =
new RecordingScheduledTask(task, delayMs, periodMs);
tasks.put(name, scheduledTask);
return scheduledTask;
}

private RecordingScheduledTask getTask(String name) {
return tasks.get(name);
}
}

private static final class RecordingScheduledTask implements ScheduledFuture<Void> {
private final Runnable task;
private final long delayMs;
private final long periodMs;
private boolean cancelled;

private RecordingScheduledTask(Runnable task, long delayMs, long periodMs) {
this.task = task;
this.delayMs = delayMs;
this.periodMs = periodMs;
}

private long getDelayMs() {
return delayMs;
}

private long getPeriodMs() {
return periodMs;
}

private void run() {
task.run();
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayMs, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
cancelled = true;
return true;
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return cancelled;
}

@Override
public Void get() {
return null;
}

@Override
public Void get(long timeout, TimeUnit unit) {
return null;
}
}

@AfterEach
public void tearDown() throws Exception {
if (logManager != null) {
Expand Down
1 change: 1 addition & 0 deletions website/docs/maintenance/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ during the Fluss cluster working.
| log.index.interval-size | MemorySize | 4k | This setting controls how frequently Fluss adds an index entry to its offset index. The default setting ensures that we index a message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact position in the log but makes the index larger. You probably don't need to change this. |
| log.file-preallocate | Boolean | false | True if we should preallocate the file on disk when creating a new log segment. |
| log.flush.interval-messages | Long | Long.MAX_VALUE | This setting allows specifying an interval at which we will force a fsync of data written to the log. For example if this was set to 1, we would fsync after every message; if it were 5 we would fsync after every five messages. |
| log.flush.offset.checkpoint-interval | Duration | 1min | The frequency with which we update the persistent record of the last flush which acts as the log recovery point. The default setting is 60 seconds. |
| log.replica.high-watermark.checkpoint-interval | Duration | 5s | The frequency with which the high watermark is saved out to disk. The default setting is 5 seconds. |
| log.replica.max-lag-time | Duration | 30s | If a follower replica hasn't sent any fetch log requests or hasn't consumed up the leaders log end offset for at least this time, the leader will remove the follower replica from isr |
| log.replica.write-operation-purge-number | Integer | 1000 | The purge number (in number of requests) of the write operation manager, the default value is 1000. |
Expand Down
Loading