From a57054b371f788a60a5c451b14f402544c1448af Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 22 Apr 2024 11:09:08 +0530 Subject: [PATCH] Incoporate PR review feedback Signed-off-by: Ashish Singh --- .../remote/IndexMetadataUploadListener.java | 3 +-- .../remote/RemoteClusterStateService.java | 19 +++++-------------- .../index/remote/RemoteIndexPathUploader.java | 18 ++++++++++++++++-- .../main/java/org/opensearch/node/Node.java | 6 +++--- .../blobstore/ChecksumBlobStoreFormat.java | 3 +-- .../remote/RemoteIndexPathUploaderTests.java | 2 -- 6 files changed, 26 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java index a19b7aea9491c..64cd27858c6a8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadListener.java @@ -11,7 +11,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.core.action.ActionListener; -import java.io.IOException; import java.util.List; /** @@ -30,7 +29,7 @@ public interface IndexMetadataUploadListener { * @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload). * @param actionListener listener to be invoked on success or failure. */ - void beforeNewIndexUpload(List indexMetadataList, ActionListener actionListener) throws IOException; + void beforeNewIndexUpload(List indexMetadataList, ActionListener actionListener); String getThreadpoolName(); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index a44aa1ce21153..a7fbf752108e2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -524,7 +524,7 @@ private List writeIndexMetadataParallel( } /** - * Invokes the index metadata upload listener. + * Invokes the index metadata upload listener but does not wait for the execution to complete. */ private void invokeIndexMetadataUploadListeners( List newIndexMetadataList, @@ -537,19 +537,10 @@ private void invokeIndexMetadataUploadListeners( String threadPoolName = listener.getThreadpoolName(); assert ThreadPool.THREAD_POOL_TYPES.containsKey(threadPoolName) && ThreadPool.Names.SAME.equals(threadPoolName) == false; threadpool.executor(threadPoolName).execute(() -> { - try { - listener.beforeNewIndexUpload( - newIndexMetadataList, - getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName) - ); - } catch (IOException e) { - exceptionList.add( - new RemoteStateTransferException( - "Exception occurred while running invokeIndexMetadataUploadListeners in " + listenerName, - e - ) - ); - } + listener.beforeNewIndexUpload( + newIndexMetadataList, + getIndexMetadataUploadActionListener(newIndexMetadataList, latch, exceptionList, listenerName) + ); }); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 1984c3f462300..8165ae594d41e 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -53,6 +53,8 @@ /** * Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory} * and {@link org.opensearch.index.remote.RemoteStoreEnums.DataType} for each shard of an index. + * + * @opensearch.internal */ @ExperimentalApi public class RemoteIndexPathUploader implements IndexMetadataUploadListener { @@ -94,7 +96,7 @@ public String getThreadpoolName() { } @Override - public void beforeNewIndexUpload(List indexMetadataList, ActionListener actionListener) throws IOException { + public void beforeNewIndexUpload(List indexMetadataList, ActionListener actionListener) { if (isRemoteDataAttributePresent == false) { logger.trace("Skipping beforeNewIndexUpload as there are no remote indexes"); actionListener.onResponse(null); @@ -109,7 +111,16 @@ public void beforeNewIndexUpload(List indexMetadataList, ActionLi CountDownLatch latch = new CountDownLatch(latchCount); List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); for (IndexMetadata indexMetadata : eligibleList) { - writeIndexPathAsync(indexMetadata, latch, exceptionList); + try { + writeIndexPathAsync(indexMetadata, latch, exceptionList); + } catch (IOException exception) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, List.of(indexMetadata.getIndex().getName())) + ); + exceptionList.forEach(ex::addSuppressed); + actionListener.onFailure(ex); + return; + } } String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); logger.trace(new ParameterizedMessage("Remote index path upload started for {}", indexNames)); @@ -142,6 +153,9 @@ public void beforeNewIndexUpload(List indexMetadataList, ActionLi } success = true; actionListener.onResponse(null); + } catch (Exception ex) { + actionListener.onFailure(ex); + throw ex; } finally { long tookTimeNs = System.nanoTime() - startTime; logger.trace(new ParameterizedMessage("executed beforeNewIndexUpload status={} tookTimeNs={}", success, tookTimeNs)); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 70357a707d6c0..3e3348ae43a0b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1472,9 +1472,9 @@ public Node start() throws NodeValidationException { if (remoteClusterStateService != null) { remoteClusterStateService.start(); } - final RemoteIndexPathUploader indexCreationListener = injector.getInstance(RemoteIndexPathUploader.class); - if (indexCreationListener != null) { - indexCreationListener.start(); + final RemoteIndexPathUploader remoteIndexPathUploader = injector.getInstance(RemoteIndexPathUploader.class); + if (remoteIndexPathUploader != null) { + remoteIndexPathUploader.start(); } // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 294022f00a2c1..3e6052a5ef820 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -300,7 +300,6 @@ public BytesReference serialize(final T obj, final String blobName, final Compre ) ) { CodecUtil.writeHeader(indexOutput, codec, VERSION); - XContentType xContentType = XContentType.SMILE; try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { @Override public void close() throws IOException { @@ -309,7 +308,7 @@ public void close() throws IOException { } }; XContentBuilder builder = MediaTypeRegistry.contentBuilder( - xContentType, + XContentType.SMILE, compressor.threadLocalOutputStream(indexOutputOutputStream) ) ) { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java index 07bf041ce8312..c70523cf542a3 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteIndexPathUploaderTests.java @@ -251,8 +251,6 @@ public void testInterceptWithInterruptedExceptionDuringLatchAwait() throws Excep Thread thread = new Thread(() -> { try { remoteIndexPathUploader.beforeNewIndexUpload(indexMetadataList, actionListener); - } catch (IOException e) { - throw new AssertionError(e); } catch (Exception e) { assertTrue(e instanceof InterruptedException); assertEquals("sleep interrupted", e.getMessage());