|
23 | 23 | import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
24 | 24 | import org.elasticsearch.action.support.ListenerTimeouts;
|
25 | 25 | import org.elasticsearch.action.support.PlainActionFuture;
|
26 |
| -import org.elasticsearch.action.support.ThreadedActionListener; |
27 | 26 | import org.elasticsearch.client.internal.Client;
|
28 | 27 | import org.elasticsearch.cluster.ClusterName;
|
29 | 28 | import org.elasticsearch.cluster.ClusterState;
|
@@ -176,30 +175,32 @@ public void getSnapshotInfo(GetSnapshotInfoContext context) {
|
176 | 175 | final List<SnapshotId> snapshotIds = context.snapshotIds();
|
177 | 176 | assert snapshotIds.size() == 1 && SNAPSHOT_ID.equals(snapshotIds.iterator().next())
|
178 | 177 | : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds;
|
179 |
| - try { |
180 |
| - getRemoteClusterClient().admin() |
181 |
| - .cluster() |
182 |
| - .prepareState() |
183 |
| - .clear() |
184 |
| - .setMetadata(true) |
185 |
| - .setNodes(true) |
186 |
| - // fork to the snapshot meta pool because the context expects to run on it and asserts that it does |
187 |
| - .execute(new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.SNAPSHOT_META, context.map(response -> { |
188 |
| - Metadata responseMetadata = response.getState().metadata(); |
189 |
| - Map<String, IndexMetadata> indicesMap = responseMetadata.indices(); |
190 |
| - List<String> indices = new ArrayList<>(indicesMap.keySet()); |
191 |
| - return new SnapshotInfo( |
| 178 | + Client remoteClient = getRemoteClusterClient(); |
| 179 | + ClusterStateResponse response = remoteClient.admin() |
| 180 | + .cluster() |
| 181 | + .prepareState() |
| 182 | + .clear() |
| 183 | + .setMetadata(true) |
| 184 | + .setNodes(true) |
| 185 | + .get(ccrSettings.getRecoveryActionTimeout()); |
| 186 | + Metadata responseMetadata = response.getState().metadata(); |
| 187 | + Map<String, IndexMetadata> indicesMap = responseMetadata.indices(); |
| 188 | + List<String> indices = new ArrayList<>(indicesMap.keySet()); |
| 189 | + |
| 190 | + // fork to the snapshot meta pool because the context expects to run on it and asserts that it does |
| 191 | + threadPool.executor(ThreadPool.Names.SNAPSHOT_META) |
| 192 | + .execute( |
| 193 | + () -> context.onResponse( |
| 194 | + new SnapshotInfo( |
192 | 195 | new Snapshot(this.metadata.name(), SNAPSHOT_ID),
|
193 | 196 | indices,
|
194 | 197 | new ArrayList<>(responseMetadata.dataStreams().keySet()),
|
195 | 198 | Collections.emptyList(),
|
196 | 199 | response.getState().getNodes().getMaxNodeVersion(),
|
197 | 200 | SnapshotState.SUCCESS
|
198 |
| - ); |
199 |
| - }), false)); |
200 |
| - } catch (Exception e) { |
201 |
| - context.onFailure(e); |
202 |
| - } |
| 201 | + ) |
| 202 | + ) |
| 203 | + ); |
203 | 204 | }
|
204 | 205 |
|
205 | 206 | @Override
|
@@ -257,42 +258,44 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
|
257 | 258 |
|
258 | 259 | @Override
|
259 | 260 | public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
260 |
| - try { |
261 |
| - getRemoteClusterClient().admin().cluster().prepareState().clear().setMetadata(true).execute(listener.map(response -> { |
262 |
| - final Metadata remoteMetadata = response.getState().getMetadata(); |
263 |
| - final String[] concreteAllIndices = remoteMetadata.getConcreteAllIndices(); |
264 |
| - final Map<String, SnapshotId> copiedSnapshotIds = Maps.newMapWithExpectedSize(concreteAllIndices.length); |
265 |
| - final Map<String, RepositoryData.SnapshotDetails> snapshotsDetails = Maps.newMapWithExpectedSize(concreteAllIndices.length); |
266 |
| - final Map<IndexId, List<SnapshotId>> indexSnapshots = Maps.newMapWithExpectedSize(concreteAllIndices.length); |
267 |
| - final Map<String, IndexMetadata> remoteIndices = remoteMetadata.getIndices(); |
268 |
| - for (String indexName : concreteAllIndices) { |
269 |
| - // Both the Snapshot name and UUID are set to _latest_ |
270 |
| - final SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); |
271 |
| - copiedSnapshotIds.put(indexName, snapshotId); |
272 |
| - final long nowMillis = threadPool.absoluteTimeInMillis(); |
273 |
| - snapshotsDetails.put( |
274 |
| - indexName, |
275 |
| - new RepositoryData.SnapshotDetails(SnapshotState.SUCCESS, Version.CURRENT, nowMillis, nowMillis, "") |
276 |
| - ); |
277 |
| - indexSnapshots.put( |
278 |
| - new IndexId(indexName, remoteIndices.get(indexName).getIndex().getUUID()), |
279 |
| - Collections.singletonList(snapshotId) |
280 |
| - ); |
281 |
| - } |
282 |
| - return new RepositoryData( |
283 |
| - MISSING_UUID, |
284 |
| - 1, |
285 |
| - copiedSnapshotIds, |
286 |
| - snapshotsDetails, |
287 |
| - indexSnapshots, |
288 |
| - ShardGenerations.EMPTY, |
289 |
| - IndexMetaDataGenerations.EMPTY, |
290 |
| - MISSING_UUID |
| 261 | + ActionListener.completeWith(listener, () -> { |
| 262 | + Client remoteClient = getRemoteClusterClient(); |
| 263 | + ClusterStateResponse response = remoteClient.admin() |
| 264 | + .cluster() |
| 265 | + .prepareState() |
| 266 | + .clear() |
| 267 | + .setMetadata(true) |
| 268 | + .get(ccrSettings.getRecoveryActionTimeout()); |
| 269 | + Metadata remoteMetadata = response.getState().getMetadata(); |
| 270 | + |
| 271 | + Map<String, SnapshotId> copiedSnapshotIds = new HashMap<>(); |
| 272 | + Map<String, RepositoryData.SnapshotDetails> snapshotsDetails = Maps.newMapWithExpectedSize(copiedSnapshotIds.size()); |
| 273 | + Map<IndexId, List<SnapshotId>> indexSnapshots = Maps.newMapWithExpectedSize(copiedSnapshotIds.size()); |
| 274 | + |
| 275 | + Map<String, IndexMetadata> remoteIndices = remoteMetadata.getIndices(); |
| 276 | + for (String indexName : remoteMetadata.getConcreteAllIndices()) { |
| 277 | + // Both the Snapshot name and UUID are set to _latest_ |
| 278 | + SnapshotId snapshotId = new SnapshotId(LATEST, LATEST); |
| 279 | + copiedSnapshotIds.put(indexName, snapshotId); |
| 280 | + final long nowMillis = threadPool.absoluteTimeInMillis(); |
| 281 | + snapshotsDetails.put( |
| 282 | + indexName, |
| 283 | + new RepositoryData.SnapshotDetails(SnapshotState.SUCCESS, Version.CURRENT, nowMillis, nowMillis, "") |
291 | 284 | );
|
292 |
| - })); |
293 |
| - } catch (Exception e) { |
294 |
| - listener.onFailure(e); |
295 |
| - } |
| 285 | + Index index = remoteIndices.get(indexName).getIndex(); |
| 286 | + indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId)); |
| 287 | + } |
| 288 | + return new RepositoryData( |
| 289 | + MISSING_UUID, |
| 290 | + 1, |
| 291 | + copiedSnapshotIds, |
| 292 | + snapshotsDetails, |
| 293 | + indexSnapshots, |
| 294 | + ShardGenerations.EMPTY, |
| 295 | + IndexMetaDataGenerations.EMPTY, |
| 296 | + MISSING_UUID |
| 297 | + ); |
| 298 | + }); |
296 | 299 | }
|
297 | 300 |
|
298 | 301 | @Override
|
|
0 commit comments