Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import org.opensearch.be.lucene.index.LuceneCommitterFactory;
import org.opensearch.be.lucene.index.LuceneDeleteExecutionEngine;
import org.opensearch.be.lucene.index.LuceneIndexingExecutionEngine;
import org.opensearch.be.lucene.stats.LuceneShardStats;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DataFormatDescriptor;
import org.opensearch.index.engine.dataformat.DataFormatPlugin;
import org.opensearch.index.engine.dataformat.DeleteExecutionEngine;
import org.opensearch.index.engine.dataformat.DataFormatRegistry;
import org.opensearch.index.engine.dataformat.DeleteExecutionEngine;
import org.opensearch.index.engine.dataformat.IndexingEngineConfig;
import org.opensearch.index.engine.dataformat.IndexingExecutionEngine;
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
Expand Down Expand Up @@ -55,6 +56,7 @@
public class LucenePlugin extends Plugin implements DataFormatPlugin, SearchBackEndPlugin<DirectoryReader>, EnginePlugin {

private static final LuceneDataFormat DATA_FORMAT = new LuceneDataFormat();
private final LuceneShardStats stats = new LuceneShardStats();

/** Creates a new LucenePlugin. */
public LucenePlugin() {}
Expand Down Expand Up @@ -93,7 +95,10 @@ public DataFormat getDataFormat() {
}

@Override
public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(IndexSettings indexSettings, DataFormatRegistry dataFormatRegistry) {
public Map<String, Supplier<DataFormatDescriptor>> getFormatDescriptors(
IndexSettings indexSettings,
DataFormatRegistry dataFormatRegistry
) {
return Map.of(DATA_FORMAT.name(), () -> new DataFormatDescriptor(DATA_FORMAT.name(), new LuceneChecksumHandler()));
}

Expand Down Expand Up @@ -134,11 +139,11 @@ public EngineReaderManager<DirectoryReader> createReaderManager(ReaderManagerCon
*/
@Override
public Optional<CommitterFactory> getCommitterFactory(IndexSettings indexSettings) {
return Optional.of(new LuceneCommitterFactory());
return Optional.of(new LuceneCommitterFactory(stats));
}

@Override
public DeleteExecutionEngine<?> getDeleteExecutionEngine(Committer committer) {
return new LuceneDeleteExecutionEngine(DATA_FORMAT, committer);
return new LuceneDeleteExecutionEngine(DATA_FORMAT, committer, stats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import org.opensearch.be.lucene.index.LuceneReplicaCommitter;
import org.opensearch.common.CheckedBiFunction;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.common.util.concurrent.RefCounted;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.exec.EngineReaderManager;
import org.opensearch.index.engine.exec.Segment;
Expand Down Expand Up @@ -51,12 +49,19 @@ public class LuceneReaderManager implements EngineReaderManager<DirectoryReader>
/**
* Creates a new LuceneReaderManager.
*
* @param dataFormat the data format this reader manager serves
* @param initialReader the initial DirectoryReader, must not be null
* @param dataFormat the data format this reader manager serves
* @param initialReader the initial DirectoryReader, must not be null
* @param readers shared map of generation to DirectoryReader for segment-level reader reuse
* @param readerRefresher function that opens a refreshed reader given the current reader and new
* {@link SegmentInfos}; returns {@code null} if no refresh is needed
* @throws NullPointerException if initialReader is null
*/
public LuceneReaderManager(DataFormat dataFormat, DirectoryReader initialReader, Map<Long, DirectoryReader> readers,
CheckedBiFunction<DirectoryReader, SegmentInfos, DirectoryReader, IOException> readerRefresher) {
public LuceneReaderManager(
DataFormat dataFormat,
DirectoryReader initialReader,
Map<Long, DirectoryReader> readers,
CheckedBiFunction<DirectoryReader, SegmentInfos, DirectoryReader, IOException> readerRefresher
) {
this.dataFormat = dataFormat;
Objects.requireNonNull(initialReader, "initialReader must not be null");
this.currentReader = initialReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.opensearch.be.lucene.index.LuceneIndexingExecutionEngine;
import org.opensearch.common.CheckedBiFunction;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.engine.dataformat.ReaderManagerConfig;
import org.opensearch.index.engine.exec.EngineReaderManager;
import org.opensearch.index.engine.exec.commit.IndexStoreProvider;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.opensearch.index.engine.NRTReplicationReaderManager.unwrapStandardReader;

/**
* Static helpers for creating Lucene-based {@link EngineReaderManager} instances.
* <p>
Expand Down Expand Up @@ -78,7 +74,7 @@ static EngineReaderManager<DirectoryReader> createReaderManager(ReaderManagerCon
}

private static DirectoryReader buildReader(DirectoryReader oldReader, SegmentInfos newSis) throws IOException {
if (newSis == null || ((StandardDirectoryReader)oldReader).getSegmentInfos().version == newSis.version) {
if (newSis == null || ((StandardDirectoryReader) oldReader).getSegmentInfos().version == newSis.version) {
return null;
}
final List<LeafReader> subs = new ArrayList<>();
Expand All @@ -87,12 +83,6 @@ private static DirectoryReader buildReader(DirectoryReader oldReader, SegmentInf
}
// Segment_n here is ignored because it is either already committed on disk as part of previous commit point or
// does not yet exist on store (not yet committed)
return StandardDirectoryReader.open(
oldReader.directory(),
newSis,
subs,
null,
null
);
return StandardDirectoryReader.open(oldReader.directory(), newSis, subs, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.util.Version;
import org.opensearch.be.lucene.LuceneDataFormat;
import org.opensearch.be.lucene.stats.LuceneShardStats;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.EngineConfig;
Expand All @@ -51,6 +52,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class LuceneCommitter extends SafeBootstrapCommitter {
private final Store store;
private final MergeIndexWriter indexWriter;
private final LuceneCommitDeletionPolicy deletionPolicy;
private final LuceneShardStats stats;
private final AtomicBoolean isClosed = new AtomicBoolean();
// Keyed by catalog snapshot generation — survives snapshot cloning at the upload boundary.
private final Map<Long, DirectoryReader> readers = new ConcurrentHashMap<>();
Expand All @@ -105,10 +108,12 @@ public class LuceneCommitter extends SafeBootstrapCommitter {
* then opens the IndexWriter.
*
* @param committerConfig the committer committerConfig (shard path, index committerConfig, engine config, store)
* @param stats the shard-level stats collector
* @throws IOException if opening the IndexWriter fails
*/
public LuceneCommitter(CommitterConfig committerConfig) throws IOException {
public LuceneCommitter(CommitterConfig committerConfig, LuceneShardStats stats) throws IOException {
super(committerConfig);
this.stats = stats;
this.store = Objects.requireNonNull(committerConfig.engineConfig().getStore());
this.store.incRef();
try {
Expand All @@ -134,13 +139,19 @@ public LuceneCommitter(CommitterConfig committerConfig) throws IOException {
@Override
public synchronized CommitResult commit(CommitInput commitData) throws IOException {
ensureOpen();
indexWriter.setLiveCommitData(commitData.userData());
indexWriter.commit();
SegmentInfos committed = SegmentInfos.readLatestCommit(indexWriter.getDirectory());
long start = System.nanoTime();
try {
indexWriter.setLiveCommitData(commitData.userData());
indexWriter.commit();
SegmentInfos committed = SegmentInfos.readLatestCommit(indexWriter.getDirectory());

// Encode writer's Lucene version as a long — keeps CatalogSnapshot Lucene-type-agnostic.
long version = LuceneVersionConverter.encode(committed.getCommitLuceneVersion());
return new CommitResult(committed.getSegmentsFileName(), committed.getGeneration(), version);
// Encode writer's Lucene version as a long — keeps CatalogSnapshot Lucene-type-agnostic.
long version = LuceneVersionConverter.encode(committed.getCommitLuceneVersion());
return new CommitResult(committed.getSegmentsFileName(), committed.getGeneration(), version);
} finally {
stats.incCommitTotal();
stats.addCommitTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}

/**
Expand Down Expand Up @@ -396,7 +407,12 @@ static Map<IndexCommit, CatalogSnapshot> loadCommittedSnapshots(Store store) thr
// the first real flush — preventing the catalog generation fallback from
// leaking into ReplicationCheckpoint.segmentsGen.
dfa = (DataformatAwareCatalogSnapshot) CatalogSnapshotManager.createInitialSnapshot(
0L, 0L, 0L, List.of(), -1L, ic.getUserData()
0L,
0L,
0L,
List.of(),
-1L,
ic.getUserData()
);
}
SegmentInfos committed = SegmentInfos.readCommit(store.directory(), ic.getSegmentsFileName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.be.lucene.index;

import org.opensearch.be.lucene.stats.LuceneShardStats;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.engine.exec.commit.Committer;
import org.opensearch.index.engine.exec.commit.CommitterConfig;
Expand All @@ -28,8 +29,12 @@
@ExperimentalApi
public final class LuceneCommitterFactory implements CommitterFactory {

/** Creates a new factory instance. */
public LuceneCommitterFactory() {}
private final LuceneShardStats stats;

/** Creates a new factory instance with the given stats collector. */
public LuceneCommitterFactory(LuceneShardStats stats) {
this.stats = stats;
}

/**
* Creates a new {@link LuceneCommitter} for the given settings.
Expand All @@ -42,6 +47,6 @@ public Committer getCommitter(CommitterConfig committerConfig) throws IOExceptio
if (committerConfig.isReplica()) {
return new LuceneReplicaCommitter(committerConfig);
}
return new LuceneCommitter(committerConfig);
return new LuceneCommitter(committerConfig, stats);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.Term;
import org.opensearch.be.lucene.LuceneDataFormat;
import org.opensearch.be.lucene.stats.LuceneShardStats;
import org.opensearch.index.engine.dataformat.DataFormat;
import org.opensearch.index.engine.dataformat.DeleteExecutionEngine;
import org.opensearch.index.engine.dataformat.DeleteInput;
Expand All @@ -26,6 +27,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Lucene-based implementation of {@link DeleteExecutionEngine} that tracks per-generation
Expand All @@ -41,11 +43,13 @@ public class LuceneDeleteExecutionEngine implements DeleteExecutionEngine<DataFo
private final Map<Long, Deleter> generationToDeleterMap;
private final DataFormat dataFormat;
private final LuceneCommitter committer;
private final LuceneShardStats stats;

public LuceneDeleteExecutionEngine(DataFormat dataFormat, Committer committer) {
public LuceneDeleteExecutionEngine(DataFormat dataFormat, Committer committer, LuceneShardStats stats) {
this.generationToDeleterMap = new ConcurrentHashMap<>();
this.dataFormat = dataFormat;
this.committer = (LuceneCommitter) committer;
this.stats = stats;
}

@Override
Expand All @@ -67,13 +71,21 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {

@Override
public DeleteResult deleteDocument(DeleteInput deleteInput) throws IOException {
Deleter deleter = generationToDeleterMap.get(deleteInput.generation());
if (deleter != null) {
return deleter.deleteDoc(deleteInput);
} else {
Term uid = new Term(deleteInput.fieldName(), deleteInput.value());
this.committer.getIndexWriter().deleteDocuments(uid);
return new DeleteResult.Success(1L, 1L, 1L);
long start = System.nanoTime();
try {
Deleter deleter = generationToDeleterMap.get(deleteInput.generation());
if (deleter != null) {
stats.incDeleteByGenerationTotal();
return deleter.deleteDoc(deleteInput);
} else {
stats.incDeleteSharedWriterFallbackTotal();
Term uid = new Term(deleteInput.fieldName(), deleteInput.value());
this.committer.getIndexWriter().deleteDocuments(uid);
return new DeleteResult.Success(1L, 1L, 1L);
}
} finally {
stats.incDeleteTotal();
stats.addDeleteTimeMillis(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}

Expand Down
Loading
Loading