Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import jakarta.ws.rs.core.Response;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -326,6 +327,23 @@ private void runDistributedReindexing(JobExecutionContext jobExecutionContext) t
monitorDistributedJob(jobExecutionContext, distributedJob.getId());

if (searchIndexSink != null) {
// Wait for vector embedding tasks to complete before closing
int pendingVectorTasks = searchIndexSink.getPendingVectorTaskCount();
if (pendingVectorTasks > 0) {
LOG.info("Waiting for {} pending vector embedding tasks to complete", pendingVectorTasks);
boolean vectorComplete = searchIndexSink.awaitVectorCompletion(120);
if (!vectorComplete) {
LOG.warn("Vector embedding wait timed out - some tasks may not be reflected in stats");
}
}

// Flush and wait for pending bulk requests
LOG.info("Flushing sink and waiting for pending bulk requests");
boolean flushComplete = searchIndexSink.flushAndAwait(60);
if (!flushComplete) {
LOG.warn("Sink flush timed out - some requests may not be reflected in stats");
}

searchIndexSink.close();
}

Expand Down Expand Up @@ -715,17 +733,37 @@ private void finalizeAllEntityReindex(boolean finalSuccess) {
return;
}

// Get already-promoted entities from distributed executor (if running in distributed mode)
Set<String> promotedEntities = Collections.emptySet();
if (distributedExecutor != null && distributedExecutor.getEntityTracker() != null) {
promotedEntities = distributedExecutor.getEntityTracker().getPromotedEntities();
}

// Calculate entities that still need finalization
Set<String> entitiesToFinalize = new HashSet<>(recreateContext.getEntities());
entitiesToFinalize.removeAll(promotedEntities);

if (entitiesToFinalize.isEmpty()) {
LOG.info(
"All {} entities already promoted during execution, skipping finalizeAllEntityReindex",
promotedEntities.size());
recreateContext = null;
return;
}

LOG.info(
"Finalizing {} remaining entities (already promoted: {})",
entitiesToFinalize.size(),
promotedEntities.size());

try {
recreateContext
.getEntities()
.forEach(
entityType -> {
try {
finalizeEntityReindex(entityType, true);
} catch (Exception ex) {
LOG.error("Failed to finalize reindex for entity: {}", entityType, ex);
}
});
for (String entityType : entitiesToFinalize) {
try {
finalizeEntityReindex(entityType, finalSuccess);
} catch (Exception ex) {
LOG.error("Failed to finalize reindex for entity: {}", entityType, ex);
}
}
} finally {
recreateContext = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,6 @@ public ExecutionResult execute(ReindexingConfiguration config, ReindexingJobCont
LOG.error("Reindexing failed", e);
listeners.onJobFailed(stats.get(), e);
return ExecutionResult.fromStats(stats.get(), ExecutionResult.Status.FAILED, startTime);
} finally {
cleanup();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class DistributedSearchIndexCoordinator {
private final CollectionDAO collectionDAO;
private final PartitionCalculator partitionCalculator;
private final String serverId;
private EntityCompletionTracker entityTracker;

public DistributedSearchIndexCoordinator(CollectionDAO collectionDAO) {
this.collectionDAO = collectionDAO;
Expand All @@ -101,6 +102,15 @@ public CollectionDAO getCollectionDAO() {
return collectionDAO;
}

/**
* Set the entity completion tracker for per-entity index promotion.
*
* @param tracker The entity completion tracker
*/
public void setEntityCompletionTracker(EntityCompletionTracker tracker) {
this.entityTracker = tracker;
}

/**
* Create a new distributed indexing job.
*
Expand Down Expand Up @@ -398,6 +408,17 @@ public void completePartition(UUID partitionId, long successCount, long failedCo
successCount,
failedCount);

// Record partition completion for per-entity index promotion
if (entityTracker != null) {
LOG.debug(
"Recording partition completion for entity '{}' (failed={}) in tracker",
record.entityType(),
failedCount > 0);
entityTracker.recordPartitionComplete(record.entityType(), failedCount > 0);
} else {
LOG.debug("Entity tracker is null, skipping per-entity completion tracking");
}

// Check if job should be marked as complete
checkAndUpdateJobCompletion(UUID.fromString(record.jobId()));
}
Expand Down Expand Up @@ -466,6 +487,11 @@ public void failPartition(UUID partitionId, String errorMessage) {
MAX_PARTITION_RETRIES,
errorMessage);

// Record partition completion (with failure) for per-entity index promotion
if (entityTracker != null) {
entityTracker.recordPartitionComplete(record.entityType(), true);
}

checkAndUpdateJobCompletion(UUID.fromString(record.jobId()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package org.openmetadata.service.apps.bundles.searchIndex.distributed;

import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -29,13 +33,17 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.searchIndex.BulkSink;
import org.openmetadata.service.apps.bundles.searchIndex.CompositeProgressListener;
import org.openmetadata.service.apps.bundles.searchIndex.IndexingFailureRecorder;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingConfiguration;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingJobContext;
import org.openmetadata.service.apps.bundles.searchIndex.ReindexingProgressListener;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.DefaultRecreateHandler;
import org.openmetadata.service.search.EntityReindexContext;
import org.openmetadata.service.search.RecreateIndexHandler;
import org.openmetadata.service.search.ReindexContext;

/**
Expand Down Expand Up @@ -110,6 +118,11 @@ public static boolean isCoordinatingJob(UUID jobId) {
private IndexingFailureRecorder failureRecorder;
private BulkSink searchIndexSink;

// Per-entity index promotion
private EntityCompletionTracker entityTracker;
private RecreateIndexHandler recreateIndexHandler;
private ReindexContext recreateContext;

// Reader stats tracking (accumulated across all worker threads)
private final AtomicLong coordinatorReaderSuccess = new AtomicLong(0);
private final AtomicLong coordinatorReaderFailed = new AtomicLong(0);
Expand Down Expand Up @@ -366,6 +379,14 @@ public ExecutionResult execute(
// Stats are tracked per-entityType by StageStatsTracker in PartitionWorker
// No need for redundant server-level stats persistence

// Store recreate context for per-entity promotion
this.recreateContext = recreateContext;

// Initialize entity completion tracker for per-entity index promotion
this.entityTracker = new EntityCompletionTracker(jobId);
initializeEntityTracker(jobId, recreateIndex);
coordinator.setEntityCompletionTracker(entityTracker);

// Start lock refresh thread to prevent lock expiration during long-running jobs
lockRefreshThread =
Thread.ofVirtual()
Expand Down Expand Up @@ -854,6 +875,15 @@ public SearchIndexJob getJobWithFreshStats() {
return coordinator.getJobWithAggregatedStats(currentJob.getId());
}

/**
* Get the entity completion tracker for checking which entities have been promoted.
*
* @return The entity completion tracker, or null if not initialized
*/
public EntityCompletionTracker getEntityTracker() {
return entityTracker;
}

/**
* Update the staged index mapping for the current job. This mapping tells participant servers
* which staged index to write to for each entity type during index recreation.
Expand All @@ -868,6 +898,89 @@ public void updateStagedIndexMapping(Map<String, String> stagedIndexMapping) {
coordinator.updateStagedIndexMapping(currentJob.getId(), stagedIndexMapping);
}

/**
* Initialize the entity completion tracker with partition counts and promotion callback.
*/
private void initializeEntityTracker(UUID jobId, boolean recreateIndex) {
// Count partitions per entity
Map<String, Integer> partitionCountByEntity = new HashMap<>();
List<SearchIndexPartition> allPartitions = coordinator.getPartitions(jobId, null);
for (SearchIndexPartition p : allPartitions) {
partitionCountByEntity.merge(p.getEntityType(), 1, Integer::sum);
}

// Initialize tracking for each entity
for (Map.Entry<String, Integer> entry : partitionCountByEntity.entrySet()) {
entityTracker.initializeEntity(entry.getKey(), entry.getValue());
}

LOG.info(
"Initialized entity tracker for job {} with {} entity types: {}",
jobId,
partitionCountByEntity.size(),
partitionCountByEntity);

// Set up per-entity promotion callback if recreating indices
if (recreateIndex && recreateContext != null) {
this.recreateIndexHandler = Entity.getSearchRepository().createReindexHandler();
entityTracker.setOnEntityComplete(
(entityType, success) -> promoteEntityIndex(entityType, success));
LOG.info(
"Per-entity promotion callback SET for job {} (recreateIndex={}, recreateContext entities={})",
jobId,
recreateIndex,
recreateContext.getEntities());
} else {
LOG.info(
"Per-entity promotion callback NOT set for job {} (recreateIndex={}, recreateContext={})",
jobId,
recreateIndex,
recreateContext != null ? "present" : "null");
}
}

/**
* Promote a single entity's index when all its partitions complete.
*/
private void promoteEntityIndex(String entityType, boolean success) {
if (recreateIndexHandler == null || recreateContext == null) {
LOG.warn(
"Cannot promote index for entity '{}' - no recreateIndexHandler or recreateContext",
entityType);
return;
}

Optional<String> stagedIndexOpt = recreateContext.getStagedIndex(entityType);
if (stagedIndexOpt.isEmpty()) {
LOG.debug("No staged index for entity '{}', skipping promotion", entityType);
return;
}

try {
EntityReindexContext entityContext =
EntityReindexContext.builder()
.entityType(entityType)
.originalIndex(recreateContext.getOriginalIndex(entityType).orElse(null))
.canonicalIndex(recreateContext.getCanonicalIndex(entityType).orElse(null))
.activeIndex(recreateContext.getOriginalIndex(entityType).orElse(null))
.stagedIndex(stagedIndexOpt.get())
.canonicalAliases(recreateContext.getCanonicalAlias(entityType).orElse(null))
.existingAliases(recreateContext.getExistingAliases(entityType))
.parentAliases(
new HashSet<>(listOrEmpty(recreateContext.getParentAliases(entityType))))
.build();

if (recreateIndexHandler instanceof DefaultRecreateHandler defaultHandler) {
LOG.info("Promoting index for entity '{}' (success={})", entityType, success);
defaultHandler.promoteEntityIndex(entityContext, success);
} else {
recreateIndexHandler.finalizeReindex(entityContext, success);
}
} catch (Exception e) {
LOG.error("Failed to promote index for entity '{}'", entityType, e);
}
}

/**
* Result of job execution.
*/
Expand Down
Loading
Loading