diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadSegmentInfo.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadSegmentInfo.java index 4f31d52b5540..820ffed36c4e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadSegmentInfo.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadSegmentInfo.java @@ -19,16 +19,20 @@ package org.apache.pinot.server.predownload; import io.netty.util.internal.StringUtil; +import java.io.DataInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.config.TierConfigUtils; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,9 +176,27 @@ public boolean hasSameCRC() { } } - public void updateSegmentInfoFromLocal(@Nullable SegmentDirectory segmentDirectory) { - SegmentMetadataImpl segmentMetadata = (segmentDirectory == null) ? null : segmentDirectory.getSegmentMetadata(); - _localCrc = (segmentMetadata == null) ? null : segmentMetadata.getCrc(); - _localSizeBytes = (segmentDirectory == null) ? 0 : segmentDirectory.getDiskSizeBytes(); + /** + * Populates local CRC and size from the segment directory on disk, avoiding mmap of index files. + * Reads CRC from {@code creation.meta} (8 bytes) and computes size via directory traversal. + * Sets {@code _localCrc} to null and {@code _localSizeBytes} to 0 if the segment or its + * creation.meta does not exist. + */ + public void updateSegmentInfoFromLocal(File segDir) { + if (!segDir.isDirectory()) { + LOGGER.warn("Segment path is not a directory: {}", segDir); + return; + } + File creationMeta = SegmentDirectoryPaths.findCreationMetaFile(segDir); + if (creationMeta == null || !creationMeta.exists()) { + return; + } + try (DataInputStream ds = new DataInputStream(new FileInputStream(creationMeta))) { + _localCrc = String.valueOf(ds.readLong()); + } catch (IOException e) { + LOGGER.warn("Failed to read creation.meta for segment: {} of table: {}", _segmentName, _tableNameWithType, e); + return; + } + _localSizeBytes = FileUtils.sizeOfDirectory(segDir); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadTableInfo.java b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadTableInfo.java index ed51c9468372..f4ff422b0cf0 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadTableInfo.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadTableInfo.java @@ -20,8 +20,6 @@ import java.io.File; import javax.annotation.Nullable; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; -import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -45,16 +43,6 @@ public PredownloadTableInfo(String tableNameWithType, TableConfig tableConfig, @ _instanceDataManagerConfig = instanceDataManagerConfig; } - private static void closeSegmentDirectoryQuietly(@Nullable SegmentDirectory segmentDirectory) { - if (segmentDirectory != null) { - try { - segmentDirectory.close(); - } catch (Exception e) { - LOGGER.warn("Failed to close SegmentDirectory due to error: {}", e.getMessage()); - } - } - } - public TableConfig getTableConfig() { return _tableConfig; } @@ -64,44 +52,42 @@ public InstanceDataManagerConfig getInstanceDataManagerConfig() { } /** - * After loading segment metadata from ZK, try to load from local and check if we are able to skip - * the downloading + * Checks whether the segment already exists locally with a matching CRC, and if so populates + * its local size. Reads CRC directly from {@code creation.meta} and size from directory + * traversal — no mmap of index files is performed (segment loading). + * + *

Note: a CRC match does not guarantee segment integrity. If the local segment directory is + * corrupted (e.g. truncated index files), the server startup path handles recovery: + * {@link org.apache.pinot.core.data.manager.BaseTableDataManager#addNewOnlineSegment} calls + * {@link org.apache.pinot.core.data.manager.BaseTableDataManager#tryLoadExistingSegment}, which + * performs a full segment load. If loading fails, it falls back to + * {@link org.apache.pinot.core.data.manager.BaseTableDataManager#downloadAndLoadSegment} to + * re-fetch the segment from deep store or peers. * - * @param predownloadSegmentInfo SegmentInfo of segment to be loaded - * @return true if already presents, false if needs to be downloaded + * @param predownloadSegmentInfo SegmentInfo of segment to be checked + * @return true if the segment is present with matching CRC (download can be skipped), + * false if it is missing or has a CRC mismatch */ public boolean loadSegmentFromLocal(PredownloadSegmentInfo predownloadSegmentInfo) { - SegmentDirectory segmentDirectory = null; - try { - segmentDirectory = getSegmentDirectory(predownloadSegmentInfo, _instanceDataManagerConfig); - predownloadSegmentInfo.updateSegmentInfoFromLocal(segmentDirectory); + File segDir = predownloadSegmentInfo.getSegmentDataDir(this, true); + if (!segDir.isDirectory()) { + LOGGER.info("Segment: {} of table: {} does not exist", predownloadSegmentInfo.getSegmentName(), + _tableNameWithType); + return false; + } + predownloadSegmentInfo.updateSegmentInfoFromLocal(segDir); - String segmentName = predownloadSegmentInfo.getSegmentName(); - // If the segment doesn't exist on server or its CRC has changed, then we - // need to fall back to download the segment from deep store to load it. - if (!predownloadSegmentInfo.hasSameCRC()) { - if (predownloadSegmentInfo.getLocalCrc() == null) { - LOGGER.info("Segment: {} of table: {} does not exist", segmentName, _tableNameWithType); - } else { - LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}", segmentName, _tableNameWithType, - predownloadSegmentInfo.getLocalCrc(), predownloadSegmentInfo.getCrc()); - } - return false; + String segmentName = predownloadSegmentInfo.getSegmentName(); + if (!predownloadSegmentInfo.hasSameCRC()) { + if (predownloadSegmentInfo.getLocalCrc() == null) { + LOGGER.info("Segment: {} of table: {} has no creation.meta", segmentName, _tableNameWithType); + } else { + LOGGER.info("Segment: {} of table: {} has crc change from: {} to: {}", segmentName, _tableNameWithType, + predownloadSegmentInfo.getLocalCrc(), predownloadSegmentInfo.getCrc()); } - LOGGER.info("Skip downloading segment: {} of table: {} as it already exists", segmentName, _tableNameWithType); - return true; - } finally { - closeSegmentDirectoryQuietly(segmentDirectory); + return false; } - } - - @Nullable - private SegmentDirectory getSegmentDirectory(PredownloadSegmentInfo predownloadSegmentInfo, - InstanceDataManagerConfig instanceDataManagerConfig) { - String dataDir = instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableConfig.getTableName(); - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(instanceDataManagerConfig, _tableConfig, _schema); - indexLoadingConfig.setSegmentTier(predownloadSegmentInfo.getTier()); - indexLoadingConfig.setTableDataDir(dataDir); - return predownloadSegmentInfo.initSegmentDirectory(indexLoadingConfig, this); + LOGGER.info("Skip downloading segment: {} of table: {} as it already exists", segmentName, _tableNameWithType); + return true; } } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java index c33f7498a3ab..f8b7184cbe54 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSchedulerTest.java @@ -18,8 +18,11 @@ */ package org.apache.pinot.server.predownload; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -32,8 +35,6 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; @@ -216,22 +217,25 @@ public void getSegmentsInfoWithoutCrypterName(PredownloadZKClient predownloadZkC _predownloadScheduler.getSegmentsInfo(); } - public void loadSegmentsFromLocal() { - // Only segment 3 will be loaded - SegmentDirectory segmentDirectory = mock(SegmentDirectory.class); - SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); - when(segmentDirectory.getSegmentMetadata()).thenReturn(segmentMetadata); - when(segmentDirectory.getDiskSizeBytes()).thenReturn(DISK_SIZE_BYTES); - when(segmentMetadata.getCrc()).thenReturn(String.valueOf(CRC)); + public void loadSegmentsFromLocal() + throws Exception { + // Only segment 3 will be loaded — create a real creation.meta with matching CRC + File seg3Dir = Files.createTempDirectory("predownload-seg3-predownload-seg3-").toFile(); + File creationMeta = new File(seg3Dir, "creation.meta"); + try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) { + dos.writeLong(CRC); + dos.writeLong(0L); + } when(_predownloadTableInfo.loadSegmentFromLocal(eq(_predownloadSegmentInfoList.get(2)))).thenAnswer( invocation -> { - _predownloadSegmentInfoList.get(2).updateSegmentInfoFromLocal(segmentDirectory); + _predownloadSegmentInfoList.get(2).updateSegmentInfoFromLocal(seg3Dir); return true; }); when(_predownloadTableInfo.loadSegmentFromLocal(eq(_predownloadSegmentInfoList.get(0)))).thenReturn(false); when(_predownloadTableInfo.loadSegmentFromLocal(eq(_predownloadSegmentInfoList.get(1)))).thenReturn(false); _predownloadScheduler.loadSegmentsFromLocal(); + FileUtils.deleteQuietly(seg3Dir); assertEquals(_predownloadScheduler._failedSegments.size(), 1); assertEquals(_predownloadScheduler._failedSegments.iterator().next(), _predownloadSegmentInfoList.get(0).getSegmentName()); @@ -271,6 +275,7 @@ public void downloadSegments() if (!untaredFile.exists() && !untaredFile.mkdirs()) { throw new IOException("Failed to create directory: " + untaredFile.getAbsolutePath()); } + FileUtils.writeByteArrayToFile(new File(untaredFile, "dummy.idx"), new byte[]{1, 2, 3, 4, 5}); return untaredFile; }); try (MockedStatic tarCompressionUtilsMockedStatic = mockStatic(TarCompressionUtils.class)) { @@ -280,6 +285,7 @@ public void downloadSegments() if (!untaredFile.exists() && !untaredFile.mkdirs()) { throw new IOException("Failed to create directory: " + untaredFile.getAbsolutePath()); } + FileUtils.writeByteArrayToFile(new File(untaredFile, "dummy.idx"), new byte[]{1, 2, 3, 4, 5}); return List.of(untaredFile); }); diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSegmentInfoTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSegmentInfoTest.java index bfc9c6e5f26e..cb3933581d44 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSegmentInfoTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadSegmentInfoTest.java @@ -18,6 +18,10 @@ */ package org.apache.pinot.server.predownload; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.nio.file.Files; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -27,6 +31,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; public class PredownloadSegmentInfoTest { @@ -69,4 +74,57 @@ public void testGetSegmentDataDir() { _predownloadSegmentInfo.updateSegmentInfo(metadata); assertThrows(PredownloadException.class, () -> _predownloadSegmentInfo.getSegmentDataDir(null, true)); } + + @Test + public void testUpdateSegmentInfoFromLocalFile() + throws Exception { + PredownloadSegmentInfo segmentInfo = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + segmentInfo.updateSegmentInfo(createSegmentZKMetadata()); + + File tempDir = Files.createTempDirectory("predownload-seg-test-").toFile(); + try { + // Non-existent path — logs warning and returns without updating fields + File missingSegDir = new File(tempDir, "missing"); + segmentInfo.updateSegmentInfoFromLocal(missingSegDir); + assertNull(segmentInfo.getLocalCrc()); + assertEquals(segmentInfo.getLocalSizeBytes(), 0); + + // Regular file (not a directory) — logs warning and returns without updating fields + File regularFile = new File(tempDir, "not-a-dir"); + assertTrue(regularFile.createNewFile()); + segmentInfo.updateSegmentInfoFromLocal(regularFile); + assertNull(segmentInfo.getLocalCrc()); + assertEquals(segmentInfo.getLocalSizeBytes(), 0); + + // Segment directory exists but has no creation.meta — fields stay at defaults + File segDir = new File(tempDir, SEGMENT_NAME); + assertTrue(segDir.mkdirs()); + segmentInfo.updateSegmentInfoFromLocal(segDir); + assertNull(segmentInfo.getLocalCrc()); + assertEquals(segmentInfo.getLocalSizeBytes(), 0); + + // creation.meta present with matching CRC — fields populated, isDownloaded true + File creationMeta = new File(segDir, "creation.meta"); + try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) { + dos.writeLong(CRC); + dos.writeLong(System.currentTimeMillis()); + } + org.apache.commons.io.FileUtils.writeByteArrayToFile(new File(segDir, "columns.psf"), new byte[]{1, 2, 3}); + segmentInfo.updateSegmentInfoFromLocal(segDir); + assertEquals(segmentInfo.getLocalCrc(), String.valueOf(CRC)); + assertTrue(segmentInfo.getLocalSizeBytes() > 0); + assertTrue(segmentInfo.isDownloaded()); + + // creation.meta present with different CRC — localCrc set, isDownloaded false + try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) { + dos.writeLong(CRC + 1); + dos.writeLong(System.currentTimeMillis()); + } + segmentInfo.updateSegmentInfoFromLocal(segDir); + assertEquals(segmentInfo.getLocalCrc(), String.valueOf(CRC + 1)); + assertFalse(segmentInfo.isDownloaded()); + } finally { + org.apache.commons.io.FileUtils.deleteQuietly(tempDir); + } + } } diff --git a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadTableInfoTest.java b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadTableInfoTest.java index b41af078e0de..6cbd06b3f46e 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadTableInfoTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/predownload/PredownloadTableInfoTest.java @@ -18,25 +18,20 @@ */ package org.apache.pinot.server.predownload; -import java.io.IOException; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.nio.file.Files; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; -import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; -import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; -import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; -import org.mockito.MockedStatic; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.apache.pinot.server.predownload.PredownloadTestUtil.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -68,59 +63,48 @@ public void testGetter() { @Test public void testLoadSegmentFromLocal() throws Exception { - PredownloadSegmentInfo predownloadSegmentInfo = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); - SegmentZKMetadata metadata = createSegmentZKMetadata(); - predownloadSegmentInfo.updateSegmentInfo(metadata); - InstanceDataManagerConfig instanceDataManagerConfig = spy(new HelixInstanceDataManagerConfig(_pinotConfiguration)); + File tempDir = Files.createTempDirectory("predownload-table-test-").toFile(); + try { + PredownloadSegmentInfo predownloadSegmentInfo = new PredownloadSegmentInfo(TABLE_NAME, SEGMENT_NAME); + SegmentZKMetadata metadata = createSegmentZKMetadata(); + predownloadSegmentInfo.updateSegmentInfo(metadata); - SegmentDirectoryLoader segmentDirectoryLoader = mock(SegmentDirectoryLoader.class); - SegmentDirectory segmentDirectory = mock(SegmentDirectory.class); - SegmentMetadataImpl segmentMetadataImpl = mock(SegmentMetadataImpl.class); - when(segmentDirectory.getSegmentMetadata()).thenReturn(segmentMetadataImpl); - when(segmentDirectory.getDiskSizeBytes()).thenReturn(DISK_SIZE_BYTES); - when(segmentDirectoryLoader.load(any(), any())).thenReturn(segmentDirectory); + when(_tableConfig.getTableName()).thenReturn(TABLE_NAME); + when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(tempDir.getAbsolutePath()); + when(_instanceDataManagerConfig.getTierConfigs()).thenReturn(null); - // Has segment with same CRC - try (MockedStatic segmentDirectoryLoaderRegistryMockedStatic = mockStatic( - SegmentDirectoryLoaderRegistry.class)) { - segmentDirectoryLoaderRegistryMockedStatic.when( - () -> SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(anyString())) - .thenReturn(segmentDirectoryLoader); - when(segmentMetadataImpl.getCrc()).thenReturn(String.valueOf(CRC)); - - assertTrue(_predownloadTableInfo.loadSegmentFromLocal(predownloadSegmentInfo)); - assertEquals(predownloadSegmentInfo.getLocalCrc(), String.valueOf(CRC)); - assertTrue(predownloadSegmentInfo.isDownloaded()); - assertEquals(predownloadSegmentInfo.getLocalSizeBytes(), DISK_SIZE_BYTES); - } - - // Has segment with different CRC - try (MockedStatic segmentDirectoryLoaderRegistryMockedStatic = mockStatic( - SegmentDirectoryLoaderRegistry.class)) { - segmentDirectoryLoaderRegistryMockedStatic.when( - () -> SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(anyString())) - .thenReturn(segmentDirectoryLoader); - long newCrc = CRC + 1; - when(segmentMetadataImpl.getCrc()).thenReturn(String.valueOf(newCrc)); + // Segment directory does not exist — returns false + assertFalse(_predownloadTableInfo.loadSegmentFromLocal(predownloadSegmentInfo)); + assertFalse(predownloadSegmentInfo.isDownloaded()); + // Segment directory exists but no creation.meta — returns false + File segDir = new File(tempDir, TABLE_NAME + "/" + SEGMENT_NAME); + segDir.mkdirs(); assertFalse(_predownloadTableInfo.loadSegmentFromLocal(predownloadSegmentInfo)); - assertEquals(predownloadSegmentInfo.getLocalCrc(), String.valueOf(newCrc)); assertFalse(predownloadSegmentInfo.isDownloaded()); - assertEquals(predownloadSegmentInfo.getLocalSizeBytes(), DISK_SIZE_BYTES); - } - // Does not have segment - try (MockedStatic segmentDirectoryLoaderRegistryMockedStatic = mockStatic( - SegmentDirectoryLoaderRegistry.class)) { - segmentDirectoryLoaderRegistryMockedStatic.when( - () -> SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(anyString())) - .thenReturn(segmentDirectoryLoader); - when(segmentMetadataImpl.getCrc()).thenReturn(null); - doThrow(IOException.class).when(segmentDirectory).close(); + // creation.meta present with matching CRC — returns true and populates size + File creationMeta = new File(segDir, "creation.meta"); + try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) { + dos.writeLong(CRC); + dos.writeLong(System.currentTimeMillis()); + } + org.apache.commons.io.FileUtils.writeByteArrayToFile(new File(segDir, "columns.psf"), new byte[]{1, 2, 3}); + assertTrue(_predownloadTableInfo.loadSegmentFromLocal(predownloadSegmentInfo)); + assertEquals(predownloadSegmentInfo.getLocalCrc(), String.valueOf(CRC)); + assertTrue(predownloadSegmentInfo.isDownloaded()); + assertTrue(predownloadSegmentInfo.getLocalSizeBytes() > 0); + // creation.meta present with different CRC — returns false + try (DataOutputStream dos = new DataOutputStream(new FileOutputStream(creationMeta))) { + dos.writeLong(CRC + 1); + dos.writeLong(System.currentTimeMillis()); + } assertFalse(_predownloadTableInfo.loadSegmentFromLocal(predownloadSegmentInfo)); + assertEquals(predownloadSegmentInfo.getLocalCrc(), String.valueOf(CRC + 1)); assertFalse(predownloadSegmentInfo.isDownloaded()); - assertEquals(predownloadSegmentInfo.getLocalSizeBytes(), DISK_SIZE_BYTES); + } finally { + org.apache.commons.io.FileUtils.deleteQuietly(tempDir); } } }