Skip to content

Commit

Permalink
refactor index processing
Browse files Browse the repository at this point in the history
  • Loading branch information
spbolton committed Sep 21, 2024
1 parent 6e09976 commit ce89f8b
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -36,6 +40,10 @@ public class BulkProcessorListener implements BulkProcessor.Listener {

private long contentletsIndexed;

private final ConcurrentHashMap<ReindexEntry,ReindexEntry> successful = new ConcurrentHashMap<>();
private final ConcurrentHashMap<ReindexEntry,String> failures = new ConcurrentHashMap<>();
AtomicInteger totalResponses = new AtomicInteger(0);

BulkProcessorListener () {
this.workingRecords = new HashMap<>();
}
Expand All @@ -44,6 +52,21 @@ public long getContentletsIndexed(){
return contentletsIndexed;
}

public int getTotalResponses(){
return totalResponses.get();
}

public List<ReindexEntry> getSuccesful(){
return new ArrayList<>(successful.values());
}
public Map<ReindexEntry,String> getFailures(){
return failures;
}
public Map<String, ReindexEntry> getWorkingRecords(){
return workingRecords;
}


@Override
public void beforeBulk(final long executionId, final BulkRequest request) {

Expand All @@ -66,11 +89,10 @@ public void beforeBulk(final long executionId, final BulkRequest request) {
@Override
public void afterBulk(final long executionId, final BulkRequest request, final BulkResponse response) {
Logger.debug(this.getClass(), "Bulk process completed");
final List<ReindexEntry> successful = new ArrayList<>();
float totalResponses=0;

for (BulkItemResponse bulkItemResponse : response) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
totalResponses++;
totalResponses.incrementAndGet();
String id;
if (bulkItemResponse.isFailed() || itemResponse == null) {

Expand All @@ -90,17 +112,18 @@ public void afterBulk(final long executionId, final BulkRequest request, final B
continue;
}
if (bulkItemResponse.isFailed() || itemResponse == null) {
handleFailure(idx,
failures.put(idx,
"bulk index failure:" + bulkItemResponse.getFailure().getMessage());
} else {
successful.add(idx);
successful.put(idx,idx);
}
}
handleSuccess(successful);

// 50% failure rate forces a rebuild of the BulkProcessor
if(totalResponses==0 || (successful.size() / totalResponses < .5)) {
if(totalResponses.get()==0 || ((double) successful.size() / totalResponses.get() < .5)) {
ReindexThread.rebuildBulkIndexer();
}

}

static String getMatchingReservedIdIfAny(String id) {
Expand All @@ -120,26 +143,9 @@ static String getMatchingReservedIdIfAny(String id) {
public void afterBulk(final long executionId, final BulkRequest request, final Throwable failure) {
Logger.error(ReindexThread.class, "Bulk process failed entirely:" + failure.getMessage(),
failure);
workingRecords.values().forEach(idx -> handleFailure(idx, failure.getMessage()));
workingRecords.values().forEach(idx -> failures.put(idx, failure.getMessage()));
}

private void handleSuccess(final List<ReindexEntry> successful) {

try {
if (!successful.isEmpty()) {
APILocator.getReindexQueueAPI().deleteReindexEntry(successful);
CacheLocator.getESQueryCache().clearCache();
}
} catch (DotDataException e) {
Logger.warnAndDebug(this.getClass(), "unable to delete indexjournal:" + e.getMessage(), e);
}
}

private void handleFailure(final ReindexEntry idx, final String cause) {
try {
APILocator.getReindexQueueAPI().markAsFailed(idx, cause);
} catch (DotDataException e) {
Logger.warnAndDebug(this.getClass(), "unable to reque indexjournal:" + idx, e);
}
}
}
Loading

0 comments on commit ce89f8b

Please sign in to comment.