@@ -391,10 +391,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue(
391
391
return request ;
392
392
}
393
393
394
- private static boolean isPartOfPIT (SearchRequest request , ShardSearchContextId contextId ) {
394
+ private static boolean isPartOfPIT (
395
+ SearchRequest request ,
396
+ ShardSearchContextId contextId ,
397
+ NamedWriteableRegistry namedWriteableRegistry
398
+ ) {
395
399
final PointInTimeBuilder pointInTimeBuilder = request .pointInTimeBuilder ();
396
400
if (pointInTimeBuilder != null ) {
397
- return request .pointInTimeBuilder ().getSearchContextId (null ).contains (contextId );
401
+ return request .pointInTimeBuilder ().getSearchContextId (namedWriteableRegistry ).contains (contextId );
398
402
} else {
399
403
return false ;
400
404
}
@@ -546,7 +550,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
546
550
static void registerNodeSearchAction (
547
551
SearchTransportService searchTransportService ,
548
552
SearchService searchService ,
549
- SearchPhaseController searchPhaseController
553
+ SearchPhaseController searchPhaseController ,
554
+ NamedWriteableRegistry namedWriteableRegistry
550
555
) {
551
556
var transportService = searchTransportService .transportService ();
552
557
var threadPool = transportService .getThreadPool ();
@@ -576,7 +581,8 @@ static void registerNodeSearchAction(
576
581
request ,
577
582
cancellableTask ,
578
583
channel ,
579
- dependencies
584
+ dependencies ,
585
+ namedWriteableRegistry
580
586
);
581
587
// TODO: log activating or otherwise limiting parallelism might be helpful here
582
588
for (int i = 0 ; i < workers ; i ++) {
@@ -587,12 +593,17 @@ static void registerNodeSearchAction(
587
593
TransportActionProxy .registerProxyAction (transportService , NODE_SEARCH_ACTION_NAME , true , NodeQueryResponse ::new );
588
594
}
589
595
590
- private static void releaseLocalContext (SearchService searchService , NodeQueryRequest request , SearchPhaseResult result ) {
596
+ private static void releaseLocalContext (
597
+ SearchService searchService ,
598
+ NodeQueryRequest request ,
599
+ SearchPhaseResult result ,
600
+ NamedWriteableRegistry namedWriteableRegistry
601
+ ) {
591
602
var phaseResult = result .queryResult () != null ? result .queryResult () : result .rankFeatureResult ();
592
603
if (phaseResult != null
593
604
&& phaseResult .hasSearchContext ()
594
605
&& request .searchRequest .scroll () == null
595
- && isPartOfPIT (request .searchRequest , phaseResult .getContextId ()) == false ) {
606
+ && isPartOfPIT (request .searchRequest , phaseResult .getContextId (), namedWriteableRegistry ) == false ) {
596
607
searchService .freeReaderContext (phaseResult .getContextId ());
597
608
}
598
609
}
@@ -736,13 +747,15 @@ private static final class QueryPerNodeState {
736
747
private final CountDown countDown ;
737
748
private final TransportChannel channel ;
738
749
private volatile BottomSortValuesCollector bottomSortCollector ;
750
+ private final NamedWriteableRegistry namedWriteableRegistry ;
739
751
740
752
private QueryPerNodeState (
741
753
QueryPhaseResultConsumer queryPhaseResultConsumer ,
742
754
NodeQueryRequest searchRequest ,
743
755
CancellableTask task ,
744
756
TransportChannel channel ,
745
- Dependencies dependencies
757
+ Dependencies dependencies ,
758
+ NamedWriteableRegistry namedWriteableRegistry
746
759
) {
747
760
this .queryPhaseResultConsumer = queryPhaseResultConsumer ;
748
761
this .searchRequest = searchRequest ;
@@ -752,6 +765,7 @@ private QueryPerNodeState(
752
765
this .countDown = new CountDown (queryPhaseResultConsumer .getNumShards ());
753
766
this .channel = channel ;
754
767
this .dependencies = dependencies ;
768
+ this .namedWriteableRegistry = namedWriteableRegistry ;
755
769
}
756
770
757
771
void onShardDone () {
@@ -762,7 +776,7 @@ void onShardDone() {
762
776
try (queryPhaseResultConsumer ) {
763
777
var failure = queryPhaseResultConsumer .failure .get ();
764
778
if (failure != null ) {
765
- handleMergeFailure (failure , channelListener );
779
+ handleMergeFailure (failure , channelListener , namedWriteableRegistry );
766
780
return ;
767
781
}
768
782
final QueryPhaseResultConsumer .MergeResult mergeResult ;
@@ -772,7 +786,7 @@ void onShardDone() {
772
786
EMPTY_PARTIAL_MERGE_RESULT
773
787
);
774
788
} catch (Exception e ) {
775
- handleMergeFailure (e , channelListener );
789
+ handleMergeFailure (e , channelListener , namedWriteableRegistry );
776
790
return ;
777
791
}
778
792
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
@@ -799,7 +813,7 @@ void onShardDone() {
799
813
&& q .hasSuggestHits () == false
800
814
&& q .getRankShardResult () == null
801
815
&& searchRequest .searchRequest .scroll () == null
802
- && isPartOfPIT (searchRequest .searchRequest , q .getContextId ()) == false ) {
816
+ && isPartOfPIT (searchRequest .searchRequest , q .getContextId (), namedWriteableRegistry ) == false ) {
803
817
if (dependencies .searchService .freeReaderContext (q .getContextId ())) {
804
818
q .clearContextId ();
805
819
}
@@ -816,9 +830,20 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
816
830
}
817
831
}
818
832
819
- private void handleMergeFailure (Exception e , ChannelActionListener <TransportResponse > channelListener ) {
833
+ private void handleMergeFailure (
834
+ Exception e ,
835
+ ChannelActionListener <TransportResponse > channelListener ,
836
+ NamedWriteableRegistry namedWriteableRegistry
837
+ ) {
820
838
queryPhaseResultConsumer .getSuccessfulResults ()
821
- .forEach (searchPhaseResult -> releaseLocalContext (dependencies .searchService , searchRequest , searchPhaseResult ));
839
+ .forEach (
840
+ searchPhaseResult -> releaseLocalContext (
841
+ dependencies .searchService ,
842
+ searchRequest ,
843
+ searchPhaseResult ,
844
+ namedWriteableRegistry
845
+ )
846
+ );
822
847
channelListener .onFailure (e );
823
848
}
824
849
0 commit comments