Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAK-49153 Search allow a sakai node to be a true client #11808

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions search/elasticsearch/impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.sakaiproject.search.elasticsearch;

import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ServerSocket;
Expand All @@ -33,28 +34,28 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.lucene.search.TotalHits;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.analysis.common.CommonAnalysisPlugin;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.network.InetAddresses;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.sakaiproject.search.api.SiteSearchIndexBuilder;
import org.sakaiproject.search.api.TermFrequency;
import org.sakaiproject.search.elasticsearch.filter.SearchItemFilter;
import org.sakaiproject.search.elasticsearch.serialization.NodeStatsResponseFactory;
import org.sakaiproject.search.model.SearchBuilderItem;
import org.sakaiproject.thread_local.api.ThreadLocalManager;
import org.sakaiproject.tool.api.SessionManager;
Expand Down Expand Up @@ -132,6 +134,7 @@ public class ElasticSearchService implements SearchService {

/* injected dependencies */
private List<String> triggerFunctions = Lists.newArrayListWithCapacity(0);
private NodeStatsResponseFactory nodeStatsResponseFactory;
private NotificationService notificationService;
private ServerConfigurationService serverConfigurationService;
private ThreadLocalManager threadLocalManager;
Expand All @@ -155,19 +158,20 @@ public EmbeddedElasticSearchNode(Settings preparedSettings, Map<String, String>

public void init() {
if (!isEnabled()) {
log.info("ElasticSearch is not enabled. Set search.enable=true to change that.");
log.info("Search is not enabled. Set search.enable=true to change that.");
return;
}
initializeElasticSearch();
initializeSearch();
nodeStatsResponseFactory = new NodeStatsResponseFactory();
}

protected void initializeElasticSearch() {
final Settings settings = initializeElasticSearchSettings();
if (node == null) node = initializeElasticSearchNode(settings);
if (client == null) client = initializeElasticSearchClient(settings);
protected void initializeSearch() {
final Settings settings = initializeSearchSettings();
if (node == null && !clientNode) node = initializeSearchNode(settings);
if (client == null) client = initializeSearchClient(settings);
}

protected Settings initializeElasticSearchSettings() {
protected Settings initializeSearchSettings() {
Settings.Builder settingsBuilder = Settings.builder();

// load anything set into the ServerConfigurationService that starts with "elasticsearch."
Expand Down Expand Up @@ -196,8 +200,6 @@ protected Settings initializeElasticSearchSettings() {
settingsBuilder.put("path.data", serverConfigurationService.getSakaiHomePath() + "/elasticsearch/" + settingsBuilder.get("node.name"));
}

log.info("Setting ElasticSearch storage area to [" + settingsBuilder.get("path.data") + "]");

String host = settingsBuilder.get("http.host");
String port = settingsBuilder.get("http.port");
String transportPort = settingsBuilder.get("transport.port");
Expand Down Expand Up @@ -225,12 +227,7 @@ protected Settings initializeElasticSearchSettings() {

settingsBuilder.put("transport.type", "netty4");

log.info("Elasticsearch configured with home=[{}], node=[{}], cluster name=[{}], http port=[{}], discovery port=[{}]",
settingsBuilder.get("path.home"),
settingsBuilder.get("node.name"),
settingsBuilder.get("cluster.name"),
settingsBuilder.get("http.port"),
settingsBuilder.get("transport.port"));
clientNode = serverConfigurationService.getBoolean("search.clientNode", false);

return settingsBuilder.build();
}
Expand All @@ -239,10 +236,10 @@ public String getNodeName() {
if (node != null) {
return node.settings().get("node.name");
}
return null;
return "";
}

protected Node initializeElasticSearchNode(Settings settings) {
protected Node initializeSearchNode(Settings settings) {
Collection<Class<? extends Plugin>> plugins = Arrays.asList(
Netty4Plugin.class,
CommonAnalysisPlugin.class);
Expand All @@ -253,29 +250,36 @@ protected Node initializeElasticSearchNode(Settings settings) {
Supplier<String> nodeName = () -> settings.get("node.name");
Node node = new EmbeddedElasticSearchNode(settings, systemProperties, nodeName, plugins);

log.info("Search node configured with node=[{}], cluster name=[{}], http port=[{}], discovery port=[{}], home=[{}], data=[{}]",
settings.get("node.name"),
settings.get("cluster.name"),
settings.get("http.port"),
settings.get("transport.port"),
settings.get("path.home"),
settings.get("path.data"));

try {
log.info("elasticsearch starting embedded node, {}", node.settings().toString());
node.start();
} catch (NodeValidationException nve) {
log.error("Could not start embedded elasticsearch node, {}", nve.toString());
log.error("Could not start embedded Search node, {}", nve.toString());
return null;
}

return node;
}

protected RestHighLevelClient initializeElasticSearchClient(Settings settings) {
protected RestHighLevelClient initializeSearchClient(Settings settings) {
String host = settings.get("http.host", "localhost");
int port = settings.getAsInt("http.port", 9200);
HttpHost httpHost = new HttpHost(host, port);
log.info("elasticsearch rest high level client configured with: {}", httpHost.toHostString());
log.info("Search rest high level client configured with: {}", httpHost.toHostString());
return new RestHighLevelClient(RestClient.builder(httpHost));
}

private int findAvailableTcpPort(String host, int port) {
try (ServerSocket serverSocket = new ServerSocket(port, 0, InetAddresses.forString(host))) {
if (serverSocket.getLocalPort() != port) {
throw new IOException("Port " + port + " is in use and can't be used by elasticsearch");
throw new IOException("Port " + port + " is in use and can't be used by search");
}
return port;
} catch (IOException ioe) {
Expand All @@ -286,7 +290,7 @@ private int findAvailableTcpPort(String host, int port) {

public void registerIndexBuilder(ElasticSearchIndexBuilder indexBuilder) {
if (!isEnabled()) {
log.info("ElasticSearch is not enabled. Skipping registration request from index builder ["
log.info("Search is not enabled. Skipping registration request from index builder ["
+ indexBuilder.getName() + "]. Set search.enable=true to change that.");
return;
}
Expand Down Expand Up @@ -390,7 +394,7 @@ private void handleNewGlobalContentFunction(String function) {
log.info("Register " + function + " as a trigger for the search service");

if (!isEnabled()) {
log.debug("ElasticSearch is not enabled. Set search.enable=true to change that.");
log.debug("Search is not enabled. Set search.enable=true to change that.");
return;
}

Expand Down Expand Up @@ -725,15 +729,18 @@ public List<SearchBuilderItem> getGlobalMasterSearchItems() {

@Override
public List<SearchStatus> getSearchStatus() {
final List<SearchStatus> indexBuilderStatuses = Lists.newArrayList();
final List<SearchStatus> indexBuilderStatuses = new ArrayList<>();
forEachRegisteredIndexBuilder(i -> indexBuilderStatuses.add(i.getSearchStatus()));

final NodesStatsResponse nodesStatsResponse = getNodesStats();

return indexBuilderStatuses
.stream()
.map(s -> newSearchStatusWrapper(s, nodesStatsResponse))
.collect(Collectors.toList());
if (nodesStatsResponse != null) {
return indexBuilderStatuses
.stream()
.map(s -> newSearchStatusWrapper(s, nodesStatsResponse))
.collect(Collectors.toList());
}
return Collections.emptyList();
}


Expand Down Expand Up @@ -792,16 +799,31 @@ public String getPDocuments() {
}

protected NodesStatsResponse getNodesStats() {
final NodesInfoResponse nodesInfoResponse = node.client().admin().cluster().nodesInfo(new NodesInfoRequest()).actionGet();
final String[] nodes = new String[nodesInfoResponse.getNodes().size()];

int i = 0;

for (NodeInfo nodeInfo : nodesInfoResponse.getNodes()) {
nodes[i++] = nodeInfo.getNode().getName();
if (node != null) {
return node.client().admin().cluster().nodesStats(new NodesStatsRequest()).actionGet();
} else {
Request request = new Request("GET", NodeStatsResponseFactory.NODE_STATS_INDICES_API);
try {
Response response = client.getLowLevelClient().performRequest(request);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode == 200) {
HttpEntity httpEntity = response.getEntity();
if (httpEntity != null) {
NodesStatsResponse nodesStatsResponse = nodeStatsResponseFactory.createNodeStatsIndicesFromJSON(httpEntity.getContent());
log.debug("Response for request [{}], status [{}], result:\n{}", response.getRequestLine(), statusCode, nodesStatsResponse);
return nodesStatsResponse;
} else {
log.warn("Response for request [{}], status [{}], warnings [{}], contained no data", response.getRequestLine(), statusCode, response.getWarnings());
}
} else {
log.warn("Response for request [{}], status [{}], warnings [{}]", response.getRequestLine(), statusCode, response.getWarnings());
}
} catch (Exception e) {
log.warn("Failed retrieving node stats from clsuter, {}", e);
}
}

return node.client().admin().cluster().nodesStats(new NodesStatsRequest(nodes)).actionGet();
return null;
}

protected void forEachRegisteredIndexBuilder(Consumer<ElasticSearchIndexBuilder> consumer) {
Expand All @@ -824,7 +846,7 @@ public void forceReload() {

@Override
public TermFrequency getTerms(int documentId) throws IOException {
throw new UnsupportedOperationException("ElasticSearch can't does not support this operation at this time.");
throw new UnsupportedOperationException("Search can't does not support this operation at this time.");
}

@Override
Expand Down Expand Up @@ -884,7 +906,7 @@ public void destroy(){
try {
node.close();
} catch (IOException ioe) {
log.error("Error shutting down elasticsearch");
log.error("Error shutting down search");
}
node = null;
}
Expand Down
Loading