Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,9 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
}
if hardlinkExistsFiles {
ext := "." + config.ArchiveExtensions[remoteBackup.DataFormat]
partName := strings.TrimPrefix(strings.TrimSuffix(archiveFile, ext), capturedDisk+"_")
// Archive files are named with the original disk prefix (e.g. "default_part1.tar.gz"),
// not the rebalanced disk name, so use `disk` for the trim
partName := strings.TrimPrefix(strings.TrimSuffix(archiveFile, ext), disk+"_")
var foundPart *metadata.Part
var idx int
for i, part := range capturedParts {
Expand Down Expand Up @@ -967,28 +969,35 @@ func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.
for disk, parts := range table.Parts {
diskPath, diskExists := b.DiskToPathMap[disk]
for i, part := range parts {
// Use per-iteration variable to avoid corrupting the outer loop variable
activeDisk := disk
activeDiskPath := diskPath
if !diskExists && part.RebalancedDisk == "" {
return 0, errors.Errorf("downloadDiffParts: table: `%s`.`%s`, disk: %s, part.Name: %s, part.RebalancedDisk: `%s` not rebalanced", table.Table, table.Database, disk, part.Name, part.RebalancedDisk)
}
if part.RebalancedDisk != "" {
diskPath, diskExists = b.DiskToPathMap[part.RebalancedDisk]
activeDiskPath, diskExists = b.DiskToPathMap[part.RebalancedDisk]
if !diskExists {
return 0, errors.Errorf("downloadDiffParts: table: `%s`.`%s`, disk: %s, part.Name: %s, part.RebalancedDisk: `%s` not rebalanced", table.Table, table.Database, disk, part.Name, part.RebalancedDisk)
}
disk = part.RebalancedDisk
if b.shouldDiskNameSkipByNameOrType(disk, disks) {
activeDisk = part.RebalancedDisk
if b.shouldDiskNameSkipByNameOrType(activeDisk, disks) {
log.Warn().Str("database", table.Database).Str("table", table.Table).Str("rebalancedDisk", part.RebalancedDisk).Msg("skipped")
continue
}
}
newPath := path.Join(diskPath, "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, disk, part.Name)
newPath := path.Join(activeDiskPath, "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, activeDisk, part.Name)
if checkErr := b.checkNewPath(newPath, part); checkErr != nil {
return 0, errors.WithMessage(checkErr, "checkNewPath")
}
if !part.Required {
continue
}
existsPath := path.Join(b.DiskToPathMap[disk], "backup", remoteBackup.RequiredBackup, "shadow", dbAndTableDir, disk, part.Name)
// existsPath must point at the active (rebalanced) disk because that's where
// findDiffFileExist routes the source download for parts with RebalancedDisk set.
// Using the original disk would yield a missing/relative path and produce a
// cross-device hardlink later in makePartHardlinks.
existsPath := path.Join(activeDiskPath, "backup", remoteBackup.RequiredBackup, "shadow", dbAndTableDir, activeDisk, part.Name)
_, statErr := os.Stat(existsPath)
if statErr != nil && !os.IsNotExist(statErr) {
return 0, errors.Wrapf(statErr, "%s stat return error", existsPath)
Expand All @@ -1010,9 +1019,13 @@ func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.
}
}
partForDownload := part
diskForDownload := disk
// diskForDownload follows the active (rebalanced) disk so findDiffBackupFilesRemote
// computes paths under the same physical disk as existsPath/newPath.
diskForDownload := activeDisk
capturedExistsPath := existsPath
capturedNewPath := newPath
// capturedDisk is the original map key for table.Parts; do not switch to activeDisk
// or table.Parts[capturedDisk][idx] would index the wrong slice.
capturedDisk := disk
idx := i
downloadDiffGroup.Go(func() error {
Expand Down
13 changes: 8 additions & 5 deletions pkg/filesystemhelper/filesystemhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,15 @@ func HardlinkBackupPartsToStorage(backupName string, backupTable metadata.TableM
// ClickHouse creates store/{uuid}/detached/ on ALL disks of the
// storage policy at table load time (MergeTreeData constructor).
// Build the path directly so hardlinks stay on the same filesystem.
if rebalancedDiskPath, hasDisk := diskMap[part.RebalancedDisk]; hasDisk && backupTable.UUID != "" {
dstParentDir = filepath.Join(rebalancedDiskPath, "store", backupTable.UUID[:3], backupTable.UUID)
dstParentDirExists = true
} else {
return errors.Errorf("can't build store path for rebalanced disk %s (uuid=%s)", part.RebalancedDisk, backupTable.UUID)
rebalancedDiskPath, hasDisk := diskMap[part.RebalancedDisk]
if !hasDisk {
return errors.Errorf("rebalanced disk %s not found in diskMap", part.RebalancedDisk)
}
if backupTable.UUID == "" {
return errors.Errorf("table UUID is empty, can't build store path for rebalanced disk %s", part.RebalancedDisk)
}
dstParentDir = filepath.Join(rebalancedDiskPath, "store", backupTable.UUID[:3], backupTable.UUID)
dstParentDirExists = true
}
}
backupDiskPath := diskMap[activeDisk]
Expand Down
1 change: 1 addition & 0 deletions pkg/metadata/table_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (tm *TableMetadata) Save(location string, metadataOnly bool) (uint64, error
newTM.Checksums = tm.Checksums
newTM.Size = tm.Size
newTM.TotalBytes = tm.TotalBytes
newTM.RebalancedFiles = tm.RebalancedFiles
newTM.MetadataOnly = false
}
if err := os.MkdirAll(path.Dir(location), 0750); err != nil {
Expand Down
Loading