5
5
*/
6
6
package org .elasticsearch .xpack .ccr .action ;
7
7
8
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
8
9
import org .elasticsearch .action .ActionListener ;
9
10
import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
10
11
import org .elasticsearch .action .admin .indices .mapping .put .PutMappingRequest ;
21
22
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
22
23
import org .elasticsearch .common .xcontent .XContentType ;
23
24
import org .elasticsearch .index .Index ;
25
+ import org .elasticsearch .index .IndexNotFoundException ;
24
26
import org .elasticsearch .index .seqno .SeqNoStats ;
25
27
import org .elasticsearch .index .shard .ShardId ;
28
+ import org .elasticsearch .index .shard .ShardNotFoundException ;
26
29
import org .elasticsearch .index .translog .Translog ;
27
30
import org .elasticsearch .persistent .AllocatedPersistentTask ;
28
31
import org .elasticsearch .persistent .PersistentTaskState ;
@@ -164,9 +167,24 @@ interface BiLongConsumer {
164
167
protected void nodeOperation (final AllocatedPersistentTask task , final ShardFollowTask params , final PersistentTaskState state ) {
165
168
Client followerClient = wrapClient (client , params .getHeaders ());
166
169
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask ) task ;
167
- logger .info ("{} Started to track leader shard {}" , params .getFollowShardId (), params .getLeaderShardId ());
168
- fetchGlobalCheckpoint (followerClient , params .getFollowShardId (),
169
- (followerGCP , maxSeqNo ) -> shardFollowNodeTask .start (followerGCP , maxSeqNo , followerGCP , maxSeqNo ), task ::markAsFailed );
170
+ logger .info ("{} Starting to track leader shard {}" , params .getFollowShardId (), params .getLeaderShardId ());
171
+
172
+ BiLongConsumer handler = (followerGCP , maxSeqNo ) -> shardFollowNodeTask .start (followerGCP , maxSeqNo , followerGCP , maxSeqNo );
173
+ Consumer <Exception > errorHandler = e -> {
174
+ if (shardFollowNodeTask .isStopped ()) {
175
+ return ;
176
+ }
177
+
178
+ if (ShardFollowNodeTask .shouldRetry (e )) {
179
+ logger .debug (new ParameterizedMessage ("failed to fetch follow shard global {} checkpoint and max sequence number" ,
180
+ shardFollowNodeTask ), e );
181
+ threadPool .schedule (params .getMaxRetryDelay (), Ccr .CCR_THREAD_POOL_NAME , () -> nodeOperation (task , params , state ));
182
+ } else {
183
+ shardFollowNodeTask .markAsFailed (e );
184
+ }
185
+ };
186
+
187
+ fetchGlobalCheckpoint (followerClient , params .getFollowShardId (), handler , errorHandler );
170
188
}
171
189
172
190
private void fetchGlobalCheckpoint (
@@ -176,6 +194,11 @@ private void fetchGlobalCheckpoint(
176
194
final Consumer <Exception > errorHandler ) {
177
195
client .admin ().indices ().stats (new IndicesStatsRequest ().indices (shardId .getIndexName ()), ActionListener .wrap (r -> {
178
196
IndexStats indexStats = r .getIndex (shardId .getIndexName ());
197
+ if (indexStats == null ) {
198
+ errorHandler .accept (new IndexNotFoundException (shardId .getIndex ()));
199
+ return ;
200
+ }
201
+
179
202
Optional <ShardStats > filteredShardStats = Arrays .stream (indexStats .getShards ())
180
203
.filter (shardStats -> shardStats .getShardRouting ().shardId ().equals (shardId ))
181
204
.filter (shardStats -> shardStats .getShardRouting ().primary ())
@@ -186,7 +209,7 @@ private void fetchGlobalCheckpoint(
186
209
final long maxSeqNo = seqNoStats .getMaxSeqNo ();
187
210
handler .accept (globalCheckpoint , maxSeqNo );
188
211
} else {
189
- errorHandler .accept (new IllegalArgumentException ( "Cannot find shard stats for shard " + shardId ));
212
+ errorHandler .accept (new ShardNotFoundException ( shardId ));
190
213
}
191
214
}, errorHandler ));
192
215
}
0 commit comments