Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TopNSearchTasksLogger settings to Cluster Settings (#6716) #7242

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
Expand Down Expand Up @@ -601,6 +602,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,
TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING,
TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING,
ClusterManagerTaskThrottler.THRESHOLD_SETTINGS,
ClusterManagerTaskThrottler.BASE_DELAY_SETTINGS,
ClusterManagerTaskThrottler.MAX_DELAY_SETTINGS,
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.Assertions;
Expand Down Expand Up @@ -860,6 +861,8 @@ protected Node(
settingsModule.getClusterSettings(),
taskHeaders
);
TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
this.extensionsManager.initializeServicesAndRestHandler(
actionModule,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void doRun() {
}

for (TaskCancellation taskCancellation : getTaskCancellations(cancellableTasks)) {
logger.debug(
logger.warn(
"[{} mode] cancelling task [{}] due to high resource consumption [{}]",
mode.getName(),
taskCancellation.getTask().getId(),
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpChannel;

Expand All @@ -69,6 +68,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -148,7 +148,11 @@ public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHea
this.taskHeaders = new ArrayList<>(taskHeaders);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.taskResourceConsumersEnabled = TASK_RESOURCE_CONSUMERS_ENABLED.get(settings);
this.taskResourceConsumer = Set.of(new TopNSearchTasksLogger(settings));
taskResourceConsumer = new HashSet<>();
}

public void registerTaskResourceConsumer(Consumer<Task> consumer) {
taskResourceConsumer.add(consumer);
}

public void setTaskResultsService(TaskResultsService taskResultsService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -30,36 +31,41 @@
*/
public class TopNSearchTasksLogger implements Consumer<Task> {
public static final String TASK_DETAILS_LOG_PREFIX = "task.detailslog";
public static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size";
public static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency";
private static final String LOG_TOP_QUERIES_SIZE = "cluster.task.consumers.top_n.size";
private static final String LOG_TOP_QUERIES_FREQUENCY = "cluster.task.consumers.top_n.frequency";

private static final Logger SEARCH_TASK_DETAILS_LOGGER = LogManager.getLogger(TASK_DETAILS_LOG_PREFIX + ".search");

// number of memory expensive search tasks that are logged
private static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
public static final Setting<Integer> LOG_TOP_QUERIES_SIZE_SETTING = Setting.intSetting(
LOG_TOP_QUERIES_SIZE,
10,
1,
100,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// frequency in which memory expensive search tasks are logged
private static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
public static final Setting<TimeValue> LOG_TOP_QUERIES_FREQUENCY_SETTING = Setting.timeSetting(
LOG_TOP_QUERIES_FREQUENCY,
TimeValue.timeValueSeconds(60L),
TimeValue.timeValueSeconds(60L),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final int topQueriesSize;
private final long topQueriesLogFrequencyInNanos;
private volatile int topQueriesSize;
private volatile long topQueriesLogFrequencyInNanos;
private final Queue<Tuple<Long, SearchShardTask>> topQueries;
private long lastReportedTimeInNanos = System.nanoTime();

public TopNSearchTasksLogger(Settings settings) {
public TopNSearchTasksLogger(Settings settings, ClusterSettings clusterSettings) {
this.topQueriesSize = LOG_TOP_QUERIES_SIZE_SETTING.get(settings);
this.topQueriesLogFrequencyInNanos = LOG_TOP_QUERIES_FREQUENCY_SETTING.get(settings).getNanos();
this.topQueries = new PriorityQueue<>(topQueriesSize, Comparator.comparingLong(Tuple::v1));
clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_SIZE_SETTING, this::setLogTopQueriesSize);
clusterSettings.addSettingsUpdateConsumer(LOG_TOP_QUERIES_FREQUENCY_SETTING, this::setTopQueriesLogFrequencyInNanos);
}

/**
Expand All @@ -78,11 +84,12 @@ private synchronized void recordSearchTask(SearchShardTask searchTask) {
publishTopNEvents();
lastReportedTimeInNanos = System.nanoTime();
}
if (topQueries.size() >= topQueriesSize && topQueries.peek().v1() < memory_in_bytes) {
int topQSize = topQueriesSize;
if (topQueries.size() >= topQSize && topQueries.peek().v1() < memory_in_bytes) {
// evict the element
topQueries.poll();
}
if (topQueries.size() < topQueriesSize) {
if (topQueries.size() < topQSize) {
topQueries.offer(new Tuple<>(memory_in_bytes, searchTask));
}
}
Expand All @@ -97,4 +104,12 @@ private void logTopResourceConsumingQueries() {
SEARCH_TASK_DETAILS_LOGGER.info(new SearchShardTaskDetailsLogMessage(topQuery.v2()));
}
}

private void setLogTopQueriesSize(int topQueriesSize) {
this.topQueriesSize = topQueriesSize;
}

void setTopQueriesLogFrequencyInNanos(TimeValue timeValue) {
this.topQueriesLogFrequencyInNanos = timeValue.getNanos();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.MockAppender;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.ResourceStats;
import org.opensearch.tasks.ResourceStatsType;
import org.opensearch.tasks.ResourceUsageMetric;
Expand All @@ -26,8 +29,9 @@

import java.util.Collections;

import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY;
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE;
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_SIZE_SETTING;
import static org.opensearch.tasks.consumer.TopNSearchTasksLogger.LOG_TOP_QUERIES_FREQUENCY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class TopNSearchTasksLoggerTests extends OpenSearchSingleNodeTestCase {
static MockAppender appender;
Expand All @@ -42,15 +46,34 @@ public static void init() throws IllegalAccessException {
Loggers.addAppender(searchLogger, appender);
}

@After
public void cleanupAfterTest() {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("*"))
.setTransientSettings(Settings.builder().putNull("*"))
);
}

@AfterClass
public static void cleanup() {
Loggers.removeAppender(searchLogger, appender);
appender.stop();
}

public void testLoggerWithTasks() {
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "0ms").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
final Settings settings = Settings.builder()
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s")
.build();
topNSearchTasksLogger = new TopNSearchTasksLogger(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
// This setting overrides is just for testing purpose
topNSearchTasksLogger.setTopQueriesLogFrequencyInNanos(TimeValue.timeValueMillis(0));
generateTasks(5);
LogEvent logEvent = appender.getLastEventAndReset();
assertNotNull(logEvent);
Expand All @@ -59,16 +82,28 @@ public void testLoggerWithTasks() {
}

public void testLoggerWithoutTasks() {
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "500ms").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
final Settings settings = Settings.builder()
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "60s")
.build();
topNSearchTasksLogger = new TopNSearchTasksLogger(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

assertNull(appender.getLastEventAndReset());
}

public void testLoggerWithHighFrequency() {
// setting the frequency to a really large value and confirming that nothing gets written to log file.
final Settings settings = Settings.builder().put(LOG_TOP_QUERIES_SIZE, 1).put(LOG_TOP_QUERIES_FREQUENCY, "10m").build();
topNSearchTasksLogger = new TopNSearchTasksLogger(settings);
final Settings settings = Settings.builder()
.put(LOG_TOP_QUERIES_SIZE_SETTING.getKey(), 1)
.put(LOG_TOP_QUERIES_FREQUENCY_SETTING.getKey(), "10m")
.build();
topNSearchTasksLogger = new TopNSearchTasksLogger(
settings,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
generateTasks(5);
generateTasks(2);

Expand Down