fix: replication for data loaded by DFLY LOAD command#6740
Conversation
🤖 Augment PR SummarySummary: Fixes replication correctness when importing a snapshot via Changes:
🤖 Was this summary useful? React with 👍 or 👎 |
There was a problem hiding this comment.
Pull request overview
Fixes replication correctness when data is introduced via DFLY LOAD (which bypasses journaling) by forcing replicas to fall back to a full resync, and adds regression coverage for both standalone and cluster mode.
Changes:
- Reset per-shard journal backlog/LSN after
DFLY LOADto invalidate partial sync offsets. - Cancel active replica sessions after load so replicas reconnect and perform FULL SYNC.
- Add new integration tests validating replication correctness after snapshot load (standalone + cluster).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
src/server/dflycmd.cc |
Forces replica full resync after DFLY LOAD by invalidating journal offsets and cancelling replica sessions. |
tests/dragonfly/replication_test.py |
Adds standalone regression test for replication after snapshot load. |
tests/dragonfly/cluster_test.py |
Adds cluster-mode regression test for replication after snapshot load. |
Comments suppressed due to low confidence (2)
tests/dragonfly/replication_test.py:4207
- The fixed
asyncio.sleep(0.5)is a timing heuristic and can be flaky on slow/loaded CI machines. Prefer waiting on an observable condition (e.g., the replica disconnect/reconnect state transition) with a bounded timeout instead of a hardcoded sleep.
# After DFLY LOAD, the master cancels all replicas to force a full resync.
# Wait for the replica to detect disconnection and complete the new full sync.
await asyncio.sleep(0.5)
await wait_for_replicas_state(c_replica)
tests/dragonfly/cluster_test.py:3840
- The fixed
asyncio.sleep(0.5)can be flaky across environments. Prefer waiting on a concrete signal (role/state change, reconnect, or replica offset progression) with a bounded timeout rather than a hardcoded sleep.
# After DFLY LOAD, the master cancels all replicas to force a full resync.
# Wait for the replica to detect disconnection and complete the new full sync.
await asyncio.sleep(0.5)
await wait_for_replicas_state(r1_node.client)
| await c_master.execute_command("DEBUG", "POPULATE", "1000", "key", "100", "RAND") | ||
| assert await c_master.dbsize() == 1000 | ||
|
|
||
| await c_master.execute_command("SAVE", "DF", dbfilename) | ||
|
|
||
| await c_master.execute_command("FLUSHALL") | ||
|
|
||
| await c_replica.execute_command("REPLICAOF", "localhost", str(master.port)) | ||
| await wait_available_async(c_replica) | ||
|
|
||
| await c_master.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs") | ||
|
|
There was a problem hiding this comment.
This test can be improved. The main issue is that it calls:
- Debug populate
- Save
- Flushes the datastore (so everything is empty)
- Calls replica of -> nothing gets replicated empty datastore
- Calls DFLY LOAD -> loads the new snapshot
Wouldn't it be better if:
- You actually had some data in both master/replica before you load a new snapshot
- Stream data while trying to LOAD the snapshot via DFLY LOAD. This will cause the bug I explained in the other comment to come up
|
@BorysTheDev what's the status of this PR? |
This PR is incorrect. I have postponed this task, but I remember it. |
ec72e08 to
b7d467b
Compare
|
augment review |
bc0119e to
b9a9795
Compare
e3851fe to
ac7eecf
Compare
| dfly_cmd_->CancelAllReplicas(); | ||
| shard_set->RunBriefInParallel([](EngineShard* shard) { | ||
| if (shard->journal()) | ||
| journal::StartInThreadAtLsn(1); |
There was a problem hiding this comment.
This a bug. Even copilot found it and you resolved it 🤣 The correct fix here is to call journal::ClearBuffer(); instead of reseting the lsn to 1.
There was a problem hiding this comment.
there is no journal::ClearBuffer() in the code. and previously I did journal::StartInThreadAtLsn(shard_journal->GetLsn() + 1); and got comment to make it 1
|
We can reduce the lines of changes in this PR + small fixes |
ac7eecf to
3365240
Compare
| LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read; | ||
|
|
||
| // Loaded data bypasses the journal, so force replicas into full sync. | ||
| dfly_cmd_->Shutdown(); |
There was a problem hiding this comment.
Shutdown is a tricky name, it just cancels all replicas. But if it ever does something non-restorable, this code will break
There was a problem hiding this comment.
I wanted to have another name, @kostasrim doesn't like this idea
There was a problem hiding this comment.
I agree DflyCmd::Shutdown should be renamed to BreakReplication or CancelReplicas or something like that
3365240 to
8b41f62
Compare
fixes: #6739