Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@
package org.apache.pinot.server.predownload;

import io.netty.util.internal.StringUtil;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -172,9 +176,27 @@ public boolean hasSameCRC() {
}
}

public void updateSegmentInfoFromLocal(@Nullable SegmentDirectory segmentDirectory) {
SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null : segmentDirectory.getSegmentMetadata();
_localCrc = (segmentMetadata == null) ? null : segmentMetadata.getCrc();
_localSizeBytes = (segmentDirectory == null) ? 0 : segmentDirectory.getDiskSizeBytes();
/**
* Populates local CRC and size from the segment directory on disk, avoiding mmap of index files.
* Reads CRC from {@code creation.meta} (8 bytes) and computes size via directory traversal.
* Sets {@code _localCrc} to null and {@code _localSizeBytes} to 0 if the segment or its
* creation.meta does not exist.
*/
public void updateSegmentInfoFromLocal(File segDir) {
if (!segDir.isDirectory()) {
LOGGER.warn("Segment path is not a directory: {}", segDir);
return;
}
File creationMeta = SegmentDirectoryPaths.findCreationMetaFile(segDir);
if (creationMeta == null || !creationMeta.exists()) {
return;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a warning log for more information?

}
try (DataInputStream ds = new DataInputStream(new FileInputStream(creationMeta))) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm, the first 8 bytes of creationMeta is CRC? Add a comment above this line could be help.

_localCrc = String.valueOf(ds.readLong());
} catch (IOException e) {
LOGGER.warn("Failed to read creation.meta for segment: {} of table: {}", _segmentName, _tableNameWithType, e);
return;
}
_localSizeBytes = FileUtils.sizeOfDirectory(segDir);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wrap this within a try-catch with IllegalArgumentException: a TOCTOU race could happen between the !segDir.isDirectory() check and the FileUtils.sizeOfDirectory if the directory itself is deleted.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.io.File;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -45,16 +43,6 @@ public PredownloadTableInfo(String tableNameWithType, TableConfig tableConfig, @
_instanceDataManagerConfig = instanceDataManagerConfig;
}

private static void closeSegmentDirectoryQuietly(@Nullable SegmentDirectory segmentDirectory) {
if (segmentDirectory != null) {
try {
segmentDirectory.close();
} catch (Exception e) {
LOGGER.warn("Failed to close SegmentDirectory due to error: {}", e.getMessage());
}
}
}

public TableConfig getTableConfig() {
return _tableConfig;
}
Expand All @@ -64,44 +52,42 @@ public InstanceDataManagerConfig getInstanceDataManagerConfig() {
}

/**
* After loading segment metadata from ZK, try to load from local and check if we are able to skip
* the downloading
* Checks whether the segment already exists locally with a matching CRC, and if so populates
* its local size. Reads CRC directly from {@code creation.meta} and size from directory
* traversal — no mmap of index files is performed (segment loading).
*
* <p>Note: a CRC match does not guarantee segment integrity. If the local segment directory is
* corrupted (e.g. truncated index files), the server startup path handles recovery:
* {@link org.apache.pinot.core.data.manager.BaseTableDataManager#addNewOnlineSegment} calls
* {@link org.apache.pinot.core.data.manager.BaseTableDataManager#tryLoadExistingSegment}, which
* performs a full segment load. If loading fails, it falls back to
* {@link org.apache.pinot.core.data.manager.BaseTableDataManager#downloadAndLoadSegment} to
* re-fetch the segment from deep store or peers.
*
* @param predownloadSegmentInfo SegmentInfo of segment to be loaded
* @return true if already presents, false if needs to be downloaded
* @param predownloadSegmentInfo SegmentInfo of segment to be checked
* @return true if the segment is present with matching CRC (download can be skipped),
* false if it is missing or has a CRC mismatch
Comment on lines +55 to +69
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use md style doc for java doc.

*/
public boolean loadSegmentFromLocal(PredownloadSegmentInfo predownloadSegmentInfo) {
SegmentDirectory segmentDirectory = null;
try {
segmentDirectory = getSegmentDirectory(predownloadSegmentInfo, _instanceDataManagerConfig);
predownloadSegmentInfo.updateSegmentInfoFromLocal(segmentDirectory);
File segDir = predownloadSegmentInfo.getSegmentDataDir(this, true);
if (!segDir.isDirectory()) {
LOGGER.info("Segment: {} of table: {} does not exist", predownloadSegmentInfo.getSegmentName(),
_tableNameWithType);
return false;
}
predownloadSegmentInfo.updateSegmentInfoFromLocal(segDir);

String segmentName = predownloadSegmentInfo.getSegmentName();
// If the segment doesn't exist on server or its CRC has changed, then we
// need to fall back to download the segment from deep store to load it.
if (!predownloadSegmentInfo.hasSameCRC()) {
if (predownloadSegmentInfo.getLocalCrc() == null) {
LOGGER.info("Segment: {} of table: {} does not exist", segmentName, _tableNameWithType);
} else {
LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}", segmentName, _tableNameWithType,
predownloadSegmentInfo.getLocalCrc(), predownloadSegmentInfo.getCrc());
}
return false;
String segmentName = predownloadSegmentInfo.getSegmentName();
if (!predownloadSegmentInfo.hasSameCRC()) {
if (predownloadSegmentInfo.getLocalCrc() == null) {
LOGGER.info("Segment: {} of table: {} has no creation.meta", segmentName, _tableNameWithType);
} else {
LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}", segmentName, _tableNameWithType,
predownloadSegmentInfo.getLocalCrc(), predownloadSegmentInfo.getCrc());
}
LOGGER.info("Skip downloading segment: {} of table: {} as it already exists", segmentName, _tableNameWithType);
return true;
} finally {
closeSegmentDirectoryQuietly(segmentDirectory);
return false;
}
}

@Nullable
private SegmentDirectory getSegmentDirectory(PredownloadSegmentInfo predownloadSegmentInfo,
InstanceDataManagerConfig instanceDataManagerConfig) {
String dataDir = instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableConfig.getTableName();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(instanceDataManagerConfig, _tableConfig, _schema);
indexLoadingConfig.setSegmentTier(predownloadSegmentInfo.getTier());
indexLoadingConfig.setTableDataDir(dataDir);
return predownloadSegmentInfo.initSegmentDirectory(indexLoadingConfig, this);
LOGGER.info("Skip downloading segment: {} of table: {} as it already exists", segmentName, _tableNameWithType);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pinot.server.predownload;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -32,8 +35,6 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
Expand Down Expand Up @@ -216,22 +217,25 @@ public void getSegmentsInfoWithoutCrypterName(PredownloadZKClient predownloadZkC
_predownloadScheduler.getSegmentsInfo();
}

public void loadSegmentsFromLocal() {
// Only segment 3 will be loaded
SegmentDirectory segmentDirectory = mock(SegmentDirectory.class);
SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentDirectory.getSegmentMetadata()).thenReturn(segmentMetadata);
when(segmentDirectory.getDiskSizeBytes()).thenReturn(DISK_SIZE_BYTES);
when(segmentMetadata.getCrc()).thenReturn(String.valueOf(CRC));
public void loadSegmentsFromLocal()
throws Exception {
// Only segment 3 will be loaded — create a real creation.meta with matching CRC
File seg3Dir = Files.createTempDirectory("predownload-seg3-predownload-seg3-").toFile();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

predownload-seg3-predownload-seg3- is it intentional? could be predownload-seg3.

File creationMeta = new File(seg3Dir, "creation.meta");
try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) {
dos.writeLong(CRC);
dos.writeLong(0L);
}
when(_predownloadTableInfo.loadSegmentFromLocal(eq(_predownloadSegmentInfoList.get(2)))).thenAnswer(
invocation -> {
_predownloadSegmentInfoList.get(2).updateSegmentInfoFromLocal(segmentDirectory);
_predownloadSegmentInfoList.get(2).updateSegmentInfoFromLocal(seg3Dir);
return true;
});
when(_predownloadTableInfo.loadSegmentFromLocal(eq(_predownloadSegmentInfoList.get(0)))).thenReturn(false);
when(_predownloadTableInfo.loadSegmentFromLocal(eq(_predownloadSegmentInfoList.get(1)))).thenReturn(false);

_predownloadScheduler.loadSegmentsFromLocal();
FileUtils.deleteQuietly(seg3Dir);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in finally? If _predownloadScheduler.loadSegmentsFromLocal() throws, this would not be executed.

assertEquals(_predownloadScheduler._failedSegments.size(), 1);
assertEquals(_predownloadScheduler._failedSegments.iterator().next(),
_predownloadSegmentInfoList.get(0).getSegmentName());
Expand Down Expand Up @@ -271,6 +275,7 @@ public void downloadSegments()
if (!untaredFile.exists() && !untaredFile.mkdirs()) {
throw new IOException("Failed to create directory: " + untaredFile.getAbsolutePath());
}
FileUtils.writeByteArrayToFile(new File(untaredFile, "dummy.idx"), new byte[]{1, 2, 3, 4, 5});
return untaredFile;
});
try (MockedStatic<TarCompressionUtils> tarCompressionUtilsMockedStatic = mockStatic(TarCompressionUtils.class)) {
Expand All @@ -280,6 +285,7 @@ public void downloadSegments()
if (!untaredFile.exists() && !untaredFile.mkdirs()) {
throw new IOException("Failed to create directory: " + untaredFile.getAbsolutePath());
}
FileUtils.writeByteArrayToFile(new File(untaredFile, "dummy.idx"), new byte[]{1, 2, 3, 4, 5});
return List.of(untaredFile);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.server.predownload;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Files;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -27,6 +31,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;


public class PredownloadSegmentInfoTest {
Expand Down Expand Up @@ -69,4 +74,57 @@ public void testGetSegmentDataDir() {
_predownloadSegmentInfo.updateSegmentInfo(metadata);
assertThrows(PredownloadException.class, () -> _predownloadSegmentInfo.getSegmentDataDir(null, true));
}

@Test
public void testUpdateSegmentInfoFromLocalFile()
throws Exception {
PredownloadSegmentInfo segmentInfo = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME);
segmentInfo.updateSegmentInfo(createSegmentZKMetadata());

File tempDir = Files.createTempDirectory("predownload-seg-test-").toFile();
try {
// Non-existent path — logs warning and returns without updating fields
File missingSegDir = new File(tempDir, "missing");
segmentInfo.updateSegmentInfoFromLocal(missingSegDir);
assertNull(segmentInfo.getLocalCrc());
assertEquals(segmentInfo.getLocalSizeBytes(), 0);

// Regular file (not a directory) — logs warning and returns without updating fields
File regularFile = new File(tempDir, "not-a-dir");
assertTrue(regularFile.createNewFile());
segmentInfo.updateSegmentInfoFromLocal(regularFile);
assertNull(segmentInfo.getLocalCrc());
assertEquals(segmentInfo.getLocalSizeBytes(), 0);

// Segment directory exists but has no creation.meta — fields stay at defaults
File segDir = new File(tempDir, SEGMENT_NAME);
assertTrue(segDir.mkdirs());
segmentInfo.updateSegmentInfoFromLocal(segDir);
assertNull(segmentInfo.getLocalCrc());
assertEquals(segmentInfo.getLocalSizeBytes(), 0);

// creation.meta present with matching CRC — fields populated, isDownloaded true
File creationMeta = new File(segDir, "creation.meta");
try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) {
dos.writeLong(CRC);
dos.writeLong(System.currentTimeMillis());
}
org.apache.commons.io.FileUtils.writeByteArrayToFile(new File(segDir, "columns.psf"), new byte[]{1, 2, 3});
segmentInfo.updateSegmentInfoFromLocal(segDir);
assertEquals(segmentInfo.getLocalCrc(), String.valueOf(CRC));
assertTrue(segmentInfo.getLocalSizeBytes() > 0);
assertTrue(segmentInfo.isDownloaded());

// creation.meta present with different CRC — localCrc set, isDownloaded false
try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) {
dos.writeLong(CRC + 1);
dos.writeLong(System.currentTimeMillis());
}
segmentInfo.updateSegmentInfoFromLocal(segDir);
assertEquals(segmentInfo.getLocalCrc(), String.valueOf(CRC + 1));
assertFalse(segmentInfo.isDownloaded());
} finally {
org.apache.commons.io.FileUtils.deleteQuietly(tempDir);
}
}
}
Loading
Loading