Skip to content

Automatically repair lost data partitions#17279

Open
zerolbsony wants to merge 31 commits intoapache:masterfrom
zerolbsony:repair_lost_data_partition
Open

Automatically repair lost data partitions#17279
zerolbsony wants to merge 31 commits intoapache:masterfrom
zerolbsony:repair_lost_data_partition

Conversation

@zerolbsony
Copy link
Contributor

After the ConfigNode restarts, it will automatically start checking whether the data partition tables are complete.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an automated “data partition table integrity check + repair” flow intended to run after ConfigNode restart by adding new DataNode RPCs for (1) earliest timeslot discovery and (2) DataPartitionTable regeneration from local TsFiles, plus a ConfigNode-side procedure to orchestrate detection/repair.

Changes:

  • Extend the DataNode Thrift RPC API with partition-table integrity check requests/responses.
  • Add a DataNode-side DataPartitionTableGenerator and supporting utilities (rate limiter, time-partition helpers) to rebuild DataPartitionTable by scanning TsFile resources.
  • Add a ConfigNode procedure and startup/registration hooks to trigger integrity check and write repaired partition tables back via consensus.

Reviewed changes

Copilot reviewed 29 out of 29 changed files in this pull request and generated 26 comments.

Show a summary per file
File Description
iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift Adds new DataNode RPC structs + methods for integrity check and partition-table generation.
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/rateLimiter/LeakyBucketRateLimiter.java Introduces a leaky-bucket throughput limiter used during scanning.
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TimePartitionUtils.java Adds helpers to convert partitionId ↔ time and a new satisfyPartitionId overload.
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java Attempts to prevent duplicate consensus group IDs per time partition.
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/DataPartitionTableGeneratorState.java Adds a status-code enum for generator task state.
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java Adds thread names for the new scan/recovery pools.
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template Adds new recovery-related configuration parameters.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java Adds a limiter-aware getDevices method signature.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java Implements limiter-aware device scanning from .resource files.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java Implements limiter-aware getDevices (no-op for in-memory index).
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java Adds getDevices(limiter) delegating to the time index.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java Implements new RPC endpoints and local filesystem scanning + DataPartitionTable serialization.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java Adds operation types for new RPC error handling.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/partition/DataPartitionTableGenerator.java Adds generator that rebuilds a DataPartitionTable from local TsFiles.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java Loads new recovery properties into DataNode config.
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java Adds new config fields + accessors for recovery tuning.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java Adds restart-time background task intended to trigger integrity check.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java Registers a new procedure type for integrity check.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java Adds procedure deserialization/dispatch for the new procedure.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DataPartitionTableIntegrityCheckProcedureState.java Adds state-machine states for the integrity check procedure.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java Implements the integrity check and repair orchestration procedure.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/ConfigNodeProcedureEnv.java Adds a new env class (currently appears redundant/unreferenced).
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java Adds registration-time trigger that can launch the integrity check.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java Adds a dataPartitionTableIntegrityCheck() convenience API.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java Loads the new wait-timeout property into ConfigNode config.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java Adds ConfigNode config field for “wait all DataNodes up” timeout.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java Wires new sync actions to the new DataNode RPC calls.
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java Adds new sync request types for integrity check and generation.
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java Adds logging members (no new test coverage for the new RPCs).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +3403 to +3406
if (timeslot
< databaseEarliestRegionMap.get(databaseName)) {
databaseEarliestRegionMap.put(databaseName, timeslot);
}
Comment on lines +651 to +652
assignedDataPartition.put(
lostDataPartitionsOfDatabases.stream().findFirst().get(), finalDataPartitionTable);
}

status = TaskStatus.IN_PROGRESS;
return CompletableFuture.runAsync(this::generateDataPartitionTableByMemory);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed yet

Comment on lines +25 to +38
* Environment object for ConfigNode procedures. Provides access to ConfigManager and other
* necessary components.
*/
public class ConfigNodeProcedureEnv {

private final ConfigManager configManager;

public ConfigNodeProcedureEnv(ConfigManager configManager) {
this.configManager = configManager;
}

public ConfigManager getConfigManager() {
return configManager;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore

Comment on lines +163 to +166
try {
dataPartitionTableCheckFuture.get();
} catch (ExecutionException | InterruptedException e) {
LOGGER.error("Data partition table check task execute failed", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed yet

int earliestTimeslotsSize = byteBuffer.getInt();
earliestTimeslots = new ConcurrentHashMap<>();
for (int i = 0; i < earliestTimeslotsSize; i++) {
String database = String.valueOf(byteBuffer.getChar());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed yet.

Comment on lines +3335 to +3341
try {
Files.list(dataDir.toPath())
.filter(Files::isDirectory)
.forEach(
sequenceTypePath -> {
try {
Files.list(sequenceTypePath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed yet

Comment on lines +3169 to +3173
@Override
public TGetEarliestTimeslotsResp getEarliestTimeslots() {
TGetEarliestTimeslotsResp resp = new TGetEarliestTimeslotsResp();

try {
Copy link
Contributor Author

@zerolbsony zerolbsony Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessarily, ignore.

Comment on lines +1381 to +1384
// Trigger integrity check asynchronously
try {
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
LOGGER.info("Data partition table integrity check procedure submitted successfully");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These codes have some problems, can't pass tests.


/* Need use these parameters when repair data partition table */
private int partitionTableRecoverWorkerNum = 10;
private int partitionTableRecoverMaxReadBytesPerSecond = 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed yet

Copy link
Contributor

@CRZbulabula CRZbulabula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL.


private long forceWalPeriodForConfigNodeSimpleInMs = 100;

private long partitionTableRecoverWaitAllDnUpTimeout = 60000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor this parameter

Suggested change
private long partitionTableRecoverWaitAllDnUpTimeout = 60000;
private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000;

dataPartitionTableCheckExecutor.submit(
() -> {
LOGGER.info(
"Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to append a prefix for all of your logs appended, e.g.,

Suggested change
"Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up");
"[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up");

() -> {
LOGGER.info(
"Prepare to start dataPartitionTableIntegrityCheck after all datanodes are started up");
// Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeout());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

latch.countDown();
} else {
LOGGER.info("No running datanodes found, waiting...");
Thread.sleep(5000); // 等待5秒后重新检查
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Thread.sleep(5000); // 等待5秒后重新检查
Thread.sleep(5000);

Comment on lines +309 to +318
.forEach(
slot -> {
if (!TimePartitionUtils.satisfyPartitionId(
slot.getStartTime(), earliestTimeslot)) {
lostDataPartitionsOfDatabases.add(database);
LOG.warn(
"Database {} has lost timeslot {} in its data table partition, and this issue needs to be repaired",
database,
earliestTimeslot);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rectify these codes. The only thing u need to check is whether the database's earlist timeSlot greater than the DataNode's.

}

/** Merge DataPartitionTables from all DataNodes into a final table. */
private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should extract some common functions for DataPartitionTable, SeriesPartitionTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see two functions have been added to the DataPartitionTable class.
image

getClusterSchemaManager().adjustMaxRegionGroupNum();

// Check if all DataNodes are registered and trigger integrity check if needed
checkAndTriggerIntegrityCheck();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this function.

ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(),
new ThreadPoolExecutor.CallerRunsPolicy());

private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be set to a local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor is no longer a global parameter of class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this map into a local variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this map into a local variable.

Changed yet

Copy link
Contributor

@CRZbulabula CRZbulabula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL.

.min(Comparator.comparingLong(TTimePartitionSlot::getStartTime))
.orElse(null);

if (!TimePartitionUtils.satisfyPartitionId(localEarliestSlot.getStartTime(), earliestTimeslot)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judge whether earliestTimeSlot.startTime < localEarliestSlot.startTime

Comment on lines +499 to +500
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
return Flow.HAS_MORE_STATE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES);
return Flow.HAS_MORE_STATE;
setNextState(DataPartitionTableIntegrityCheckProcedureState.REQUEST_PARTITION_TABLES_HEART_BEAT);
return Flow.HAS_MORE_STATE;

extends StateMachineProcedure<
ConfigNodeProcedureEnv, DataPartitionTableIntegrityCheckProcedureState> {

private static final Logger LOG =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all log with level > DEBUG, you'd better append '[DataPartitionIntegrity]' prefix, easier for your debug.

/* Need use these parameters when repair data partition table */
private int partitionTableRecoverWorkerNum = 10;
// Rate limit set to 10 MB/s
private int partitionTableRecoverMaxReadBytesPerSecond = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxReadMBs instead of Bytes

ThreadName.DATA_PARTITION_RECOVER_PARALLEL_POOL.getName(),
new ThreadPoolExecutor.CallerRunsPolicy());

private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this map into a local variable.

private Map<String, Long> databaseEarliestRegionMap = new ConcurrentHashMap<>();

// Must be lower than the RPC request timeout, in milliseconds
private static final long timeoutMs = 50000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DO NOT append member variables easily.

}

/** Process data directory to find the earliest timeslots for each database. */
private void processDataDirectoryForEarliestTimeslots(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better log your conclusion, i.e., the earlist timeslot for each database.

switch (currentGenerator.getStatus()) {
case IN_PROGRESS:
setResponseFields(resp, DataPartitionTableGeneratorState.IN_PROGRESS.getCode(), String.format(
"DataPartitionTable generation in progress: %.1f%%",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to left progress record. You'd better log it for retrieving.


public class DataNodeInternalRPCServiceImplTest {

private static final Logger LOG =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to clean it. I coded some unit tests for test rpc interfaces on the local ago.

return startPartition <= partitionId && endPartition >= partitionId;
}

public static boolean satisfyPartitionId(long startTime, long partitionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to update this class in our current implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants