Skip to content

Commit

Permalink
Merge branch 'createpitservice' of github.com:bharath-techie/OpenSear…
Browse files Browse the repository at this point in the history
…ch into deletepitservice
  • Loading branch information
bharath-techie committed Jul 15, 2022
2 parents f20dbdc + fa9946b commit 8f673c5
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ public class CreatePitController {
private final PitService pitService;
private static final Logger logger = LogManager.getLogger(CreatePitController.class);
public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting(
"pit.init.keep_alive",
timeValueSeconds(30),
Setting.Property.NodeScope
"pit.init.keep_alive",
timeValueSeconds(30),
Setting.Property.NodeScope
);

public CreatePitController(
CreatePitRequest request,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
Task task,
ActionListener<CreatePitResponse> listener,
PitService pitService
CreatePitRequest request,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
Task task,
ActionListener<CreatePitResponse> listener,
PitService pitService
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
Expand All @@ -94,11 +94,11 @@ public void executeCreatePit(StepListener<SearchResponse> createPitListener, Act
searchRequest.indicesOptions(request.getIndicesOptions());
searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation());
SearchTask searchTask = searchRequest.createTask(
task.getId(),
task.getType(),
task.getAction(),
task.getParentTaskId(),
Collections.emptyMap()
task.getId(),
task.getType(),
task.getAction(),
task.getParentTaskId(),
Collections.emptyMap()
);
/**
* Phase 1 of create PIT
Expand All @@ -109,8 +109,8 @@ public void executeCreatePit(StepListener<SearchResponse> createPitListener, Act
* Phase 2 of create PIT where we update pit id in pit contexts
*/
createPitListener.whenComplete(
searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); },
updatePitIdListener::onFailure
searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); },
updatePitIdListener::onFailure
);
}

Expand All @@ -119,108 +119,108 @@ public void executeCreatePit(StepListener<SearchResponse> createPitListener, Act
*/
void executeCreatePit(Task task, SearchRequest searchRequest, StepListener<SearchResponse> createPitListener) {
logger.debug(
() -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices()))
() -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices()))
);
transportSearchAction.executeRequest(
task,
searchRequest,
TransportCreatePitAction.CREATE_PIT_ACTION,
true,
new TransportSearchAction.SinglePhaseSearchAction() {
@Override
public void executeOnShardTarget(
SearchTask searchTask,
SearchShardTarget target,
Transport.Connection connection,
ActionListener<SearchPhaseResult> searchPhaseResultActionListener
) {
searchTransportService.createPitContext(
connection,
new TransportCreatePitAction.CreateReaderContextRequest(
target.getShardId(),
PIT_INIT_KEEP_ALIVE.get(clusterService.getSettings())
),
searchTask,
ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure)
);
}
},
createPitListener
task,
searchRequest,
TransportCreatePitAction.CREATE_PIT_ACTION,
true,
new TransportSearchAction.SinglePhaseSearchAction() {
@Override
public void executeOnShardTarget(
SearchTask searchTask,
SearchShardTarget target,
Transport.Connection connection,
ActionListener<SearchPhaseResult> searchPhaseResultActionListener
) {
searchTransportService.createPitContext(
connection,
new TransportCreatePitAction.CreateReaderContextRequest(
target.getShardId(),
PIT_INIT_KEEP_ALIVE.get(clusterService.getSettings())
),
searchTask,
ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure)
);
}
},
createPitListener
);
}

/**
* Updates PIT ID, keep alive and createdTime of PIT reader context
*/
void executeUpdatePitId(
CreatePitRequest request,
SearchRequest searchRequest,
SearchResponse searchResponse,
ActionListener<CreatePitResponse> updatePitIdListener
CreatePitRequest request,
SearchRequest searchRequest,
SearchResponse searchResponse,
ActionListener<CreatePitResponse> updatePitIdListener
) {
logger.debug(
() -> new ParameterizedMessage(
"Updating PIT context with PIT ID [{}], creation time and keep alive",
searchResponse.pointInTimeId()
)
() -> new ParameterizedMessage(
"Updating PIT context with PIT ID [{}], creation time and keep alive",
searchResponse.pointInTimeId()
)
);
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
final long relativeStartNanos = System.nanoTime();
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
searchRequest.getOrCreateAbsoluteStartMillis(),
relativeStartNanos,
System::nanoTime
searchRequest.getOrCreateAbsoluteStartMillis(),
relativeStartNanos,
System::nanoTime
);
final long creationTime = timeProvider.getAbsoluteStartMillis();
CreatePitResponse createPITResponse = new CreatePitResponse(
searchResponse.pointInTimeId(),
creationTime,
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getFailedShards(),
searchResponse.getShardFailures()
searchResponse.pointInTimeId(),
creationTime,
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getFailedShards(),
searchResponse.getShardFailures()
);
SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getConnectionLookupListener(contextId);
lookupListener.whenComplete(nodelookup -> {
final ActionListener<UpdatePitContextResponse> groupedActionListener = getGroupedListener(
updatePitIdListener,
createPITResponse,
contextId.shards().size(),
contextId.shards().values()
updatePitIdListener,
createPITResponse,
contextId.shards().size(),
contextId.shards().values()
);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
try {
final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node);
searchTransportService.updatePitContext(
connection,
new UpdatePitContextRequest(
entry.getValue().getSearchContextId(),
createPITResponse.getId(),
request.getKeepAlive().millis(),
creationTime
),
groupedActionListener
connection,
new UpdatePitContextRequest(
entry.getValue().getSearchContextId(),
createPITResponse.getId(),
request.getKeepAlive().millis(),
creationTime
),
groupedActionListener
);
} catch (Exception e) {
logger.error(
() -> new ParameterizedMessage(
"Create pit update phase failed for PIT ID [{}] on node [{}]",
searchResponse.pointInTimeId(),
node
),
e
() -> new ParameterizedMessage(
"Create pit update phase failed for PIT ID [{}] on node [{}]",
searchResponse.pointInTimeId(),
node
),
e
);
groupedActionListener.onFailure(
new OpenSearchException(
"Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]",
e
)
new OpenSearchException(
"Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]",
e
)
);
}
}
Expand All @@ -230,19 +230,19 @@ void executeUpdatePitId(
private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(SearchContextId contextId) {
ClusterState state = clusterService.state();
final Set<String> clusters = contextId.shards()
.values()
.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());
.values()
.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());
return SearchUtils.getConnectionLookupListener(searchTransportService.getRemoteClusterService(), state, clusters);
}

private ActionListener<UpdatePitContextResponse> getGroupedListener(
ActionListener<CreatePitResponse> updatePitIdListener,
CreatePitResponse createPITResponse,
int size,
Collection<SearchContextIdForNode> contexts
ActionListener<CreatePitResponse> updatePitIdListener,
CreatePitResponse createPITResponse,
int size,
Collection<SearchContextIdForNode> contexts
) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -268,16 +268,16 @@ public void onResponse(DeletePitResponse response) {
// this is invoke and forget call
final StringBuilder failedPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> !r.isSuccessful())
.forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(","));
.stream()
.filter(r -> !r.isSuccessful())
.forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(","));
logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString()));
if (!logger.isDebugEnabled()) return;
final StringBuilder successfulPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> r.isSuccessful())
.forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(","));
.stream()
.filter(r -> r.isSuccessful())
.forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(","));
logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ public class DeletePitAction extends ActionType<DeletePitResponse> {
private DeletePitAction() {
super(NAME, DeletePitResponse::new);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public void writeTo(StreamOutput out) throws IOException {
}

static final ConstructingObjectParser<DeletePitInfo, Void> PARSER = new ConstructingObjectParser<>(
"delete_pit_info",
true,
args -> new DeletePitInfo((boolean) args[0], (String) args[1])
"delete_pit_info",
true,
args -> new DeletePitInfo((boolean) args[0], (String) args[1])
);

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void fromXContent(XContentParser parser) throws IOException {
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
}

private static final ConstructingObjectParser<DeletePitResponse, Void> PARSER = new ConstructingObjectParser<>(
"delete_pit_response",
true,
(Object[] parsedObjects) -> {
@SuppressWarnings("unchecked")
List<DeletePitInfo> deletePitInfoList = (List<DeletePitInfo>) parsedObjects[0];
return new DeletePitResponse(deletePitInfoList);
}
"delete_pit_response",
true,
(Object[] parsedObjects) -> {
@SuppressWarnings("unchecked")
List<DeletePitInfo> deletePitInfoList = (List<DeletePitInfo>) parsedObjects[0];
return new DeletePitResponse(deletePitInfoList);
}
);
static {
PARSER.declareObjectArray(constructorArg(), DeletePitInfo.PARSER, new ParseField("pits"));
Expand Down
26 changes: 13 additions & 13 deletions server/src/main/java/org/opensearch/action/search/PitService.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,32 @@ public PitService(ClusterService clusterService, SearchTransportService searchTr
* Delete list of pit contexts. Returns the details of success of operation per PIT ID.
*/
public void deletePitContexts(
Map<String, List<PitSearchContextIdForNode>> nodeToContextsMap,
ActionListener<DeletePitResponse> listener
Map<String, List<PitSearchContextIdForNode>> nodeToContextsMap,
ActionListener<DeletePitResponse> listener
) {
final Set<String> clusters = nodeToContextsMap.values()
.stream()
.flatMap(Collection::stream)
.filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false)
.map(c -> c.getSearchContextIdForNode().getClusterAlias())
.collect(Collectors.toSet());
.stream()
.flatMap(Collection::stream)
.filter(ctx -> Strings.isEmpty(ctx.getSearchContextIdForNode().getClusterAlias()) == false)
.map(c -> c.getSearchContextIdForNode().getClusterAlias())
.collect(Collectors.toSet());
StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = SearchUtils.getConnectionLookupListener(
searchTransportService.getRemoteClusterService(),
clusterService.state(),
clusters
searchTransportService.getRemoteClusterService(),
clusterService.state(),
clusters
);
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<DeletePitResponse> groupedListener = getDeletePitGroupedListener(
listener,
nodeToContextsMap.size()
listener,
nodeToContextsMap.size()
);

for (Map.Entry<String, List<PitSearchContextIdForNode>> entry : nodeToContextsMap.entrySet()) {
String clusterAlias = entry.getValue().get(0).getSearchContextIdForNode().getClusterAlias();
final DiscoveryNode node = nodeLookup.apply(clusterAlias, entry.getValue().get(0).getSearchContextIdForNode().getNode());
if (node == null) {
logger.error(
() -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode())
() -> new ParameterizedMessage("node [{}] not found", entry.getValue().get(0).getSearchContextIdForNode().getNode())
);
List<DeletePitInfo> deletePitInfos = new ArrayList<>();
for (PitSearchContextIdForNode pitSearchContextIdForNode : entry.getValue()) {
Expand Down
Loading

0 comments on commit 8f673c5

Please sign in to comment.