Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into fix_remove_processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gaobinlong committed Sep 19, 2023
2 parents fd07fdf + 5b864c0 commit eaa9366
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 34 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `com.google.cloud:google-cloud-core-http` from 2.21.1 to 2.23.0 ([#9971](https://github.com/opensearch-project/OpenSearch/pull/9971))
- Bump `mockito` from 5.4.0 to 5.5.0 ([#10022](https://github.com/opensearch-project/OpenSearch/pull/10022))
- Bump `bytebuddy` from 1.14.3 to 1.14.7 ([#10022](https://github.com/opensearch-project/OpenSearch/pull/10022))
- Bump `com.zaxxer:SparseBitSet` from 1.2 to 1.3 ([#10098](https://github.com/opensearch-project/OpenSearch/pull/10098))

### Changed
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))
Expand All @@ -107,4 +108,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.11...2.x
2 changes: 1 addition & 1 deletion plugins/ingest-attachment/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ dependencies {
// Microsoft Word files with visio diagrams
api 'org.apache.commons:commons-math3:3.6.1'
// POIs dependency
api 'com.zaxxer:SparseBitSet:1.2'
api 'com.zaxxer:SparseBitSet:1.3'
}

restResources {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
533eac055afe3d5f614ea95e333afd6c2bde8f26
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RemoteFsTranslog extends Translog {
private final BooleanSupplier primaryModeSupplier;
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
private volatile long maxRemoteTranslogGenerationUploaded;
private final Object uploadMutex = new Object();

private volatile long minSeqNoToKeep;

Expand Down Expand Up @@ -237,11 +238,20 @@ public static TranslogTransferManager buildTranslogTransferManager(

@Override
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation);
try {
boolean shouldUpload = false;
try (ReleasableLock ignored = writeLock.acquire()) {
assert location.generation <= current.getGeneration();
if (location.generation == current.getGeneration()) {
ensureOpen();
if (prepareForUpload(location.generation) == false) {
return false;
}
shouldUpload = true;
}
}
if (shouldUpload) {
return performUpload(primaryTermSupplier.getAsLong(), location.generation);
}
} catch (final Exception ex) {
closeOnTragicEvent(ex);
Expand All @@ -256,10 +266,12 @@ public void rollGeneration() throws IOException {
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
return;
}
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
if (prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
}
}

private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
private boolean prepareForUpload(Long generation) throws IOException {
try (Releasable ignored = writeLock.acquire()) {
if (generation == null || generation == current.getGeneration()) {
try {
Expand All @@ -275,23 +287,41 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
closeOnTragicEvent(e);
throw e;
}
} else if (generation < current.getGeneration()) {
return false;
}
return true;
} else return generation >= current.getGeneration();
}
}

// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
if (generation == null) {
if (closed.get() == false) {
return upload(primaryTerm, current.getGeneration() - 1);
/**
* This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized
* is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the
* underlying translog readers are not deleted and the current writer is not converted to a reader at the time of
* upload.
*
* @param primaryTerm current primary term
* @param generation current generation
* @return true if upload is successful
* @throws IOException if the upload fails due to any underlying exceptions.
*/
private boolean performUpload(Long primaryTerm, Long generation) throws IOException {
synchronized (uploadMutex) {
try (Releasable ignored = readLock.acquire()) {
// Do we need remote writes in sync fashion ?
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
// and also verify for same during primary-primary relocation
// Writing remote in sync fashion doesn't hurt as global ckp update
// is not updated in remote translog except in primary to primary recovery.
long generationToUpload;
if (generation == null) {
if (closed.get() == false) {
generationToUpload = current.getGeneration() - 1;
} else {
generationToUpload = current.getGeneration();
}
} else {
return upload(primaryTerm, current.getGeneration());
generationToUpload = generation;
}
} else {
return upload(primaryTerm, generation);
return upload(primaryTerm, generationToUpload);
}
}
}
Expand Down Expand Up @@ -343,8 +373,8 @@ private boolean syncToDisk() throws IOException {
@Override
public void sync() throws IOException {
try {
if (syncToDisk() || syncNeeded()) {
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) {
performUpload(primaryTermSupplier.getAsLong(), null);
}
} catch (final Exception e) {
tragedy.setTragicException(e);
Expand Down Expand Up @@ -528,22 +558,23 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
transferReleasable.close();
closeFilesIfNoPendingRetentionLocks();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw (RuntimeException) ex;
}
}

@Override
public void close() {
transferReleasable.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration);

assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term";
assert this.generation == highestGeneration : " inconsistent generation ";
final long finalHighestGeneration = highestGeneration;
assert LongStream.iterate(lowestGeneration, i -> i + 1)
.limit(highestGeneration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded();
long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis();

try {
try (translogTransferListener) {
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
if (toUpload.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
* @opensearch.internal
*/
public interface TranslogTransferListener {
public interface TranslogTransferListener extends AutoCloseable {
/**
* Invoked when the transfer of {@link TransferSnapshot} succeeds
* @param transferSnapshot the transfer snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@
import static org.mockito.Mockito.when;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")

public class RemoteFsTranslogTests extends OpenSearchTestCase {

protected final ShardId shardId = new ShardId("index", "_na_", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) {
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
translogTransferFailed.incrementAndGet();
}

@Override
public void close() {}
}));
assertEquals(4, fileTransferSucceeded.get());
assertEquals(0, fileTransferFailed.get());
Expand Down

0 comments on commit eaa9366

Please sign in to comment.